Apache Kafka integration provider for Apache Airflow enabling workflows with Kafka message queues and streaming data platforms
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Message publishing capabilities with support for synchronous and asynchronous operations, custom delivery callbacks, and templated producer functions.
Airflow operator for producing messages to Kafka topics with flexible producer function support and delivery confirmation options.
class ProduceToTopicOperator(BaseOperator):
"""
Operator for producing messages to a Kafka topic.
Attributes:
template_fields: tuple = ("topic", "producer_function_args", "producer_function_kwargs", "kafka_config_id")
"""
def __init__(
self,
topic: str,
producer_function: str | Callable[..., Any],
kafka_config_id: str = "kafka_default",
producer_function_args: Sequence[Any] | None = None,
producer_function_kwargs: dict[Any, Any] | None = None,
delivery_callback: str | None = None,
synchronous: bool = True,
poll_timeout: float = 0,
**kwargs: Any
) -> None:
"""
Initialize the producer operator.
Args:
topic: Kafka topic name to produce to
producer_function: Function that takes a producer and produces messages
kafka_config_id: Airflow connection ID for Kafka configuration
producer_function_args: Arguments to pass to producer function
producer_function_kwargs: Keyword arguments to pass to producer function
delivery_callback: Callback function name for delivery confirmations
synchronous: Whether to wait for delivery confirmation
poll_timeout: Timeout for polling delivery confirmations
**kwargs: Additional operator arguments
"""
def execute(self, context) -> None:
"""
Execute the producer operation.
Args:
context: Airflow task context
"""Lower-level hook providing direct access to Kafka Producer client for advanced use cases.
class KafkaProducerHook(KafkaBaseHook):
"""
A hook for producing messages to Kafka topics.
Inherits from KafkaBaseHook and provides producer client access.
"""
def __init__(self, kafka_config_id: str = "kafka_default") -> None:
"""
Initialize the Kafka producer hook.
Args:
kafka_config_id: The connection object to use
"""
def get_producer(self) -> Producer:
"""
Get Kafka Producer client.
Returns:
Producer: Configured confluent-kafka Producer instance
"""
def _get_client(self, config) -> Producer:
"""
Get a Kafka Producer client with the given configuration.
Args:
config: Kafka client configuration dictionary
Returns:
Producer: Configured confluent-kafka Producer instance
"""Default and custom callback functions for handling message delivery confirmations.
def acked(err, msg):
"""
Default delivery callback for message acknowledgments.
This callback is automatically called by the producer when a message
delivery attempt completes (either successfully or with failure).
Args:
err: Error object if delivery failed, None if successful
msg: Message object with delivery details including topic, partition, offset
"""from airflow import DAG
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
from datetime import datetime
def simple_producer(producer):
"""Produce simple text messages."""
messages = ["Hello", "World", "from", "Airflow"]
for message in messages:
producer.produce("my-topic", value=message)
# Flush to ensure delivery
producer.flush()
dag = DAG(
"kafka_producer_example",
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False
)
produce_task = ProduceToTopicOperator(
task_id="produce_messages",
topic="my-topic",
producer_function=simple_producer,
kafka_config_id="kafka_default",
dag=dag
)import json
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
def json_producer(producer):
"""Produce JSON messages with keys."""
data_records = [
{"user_id": 1, "action": "login", "timestamp": "2023-01-01T10:00:00Z"},
{"user_id": 2, "action": "purchase", "timestamp": "2023-01-01T10:01:00Z"},
{"user_id": 1, "action": "logout", "timestamp": "2023-01-01T10:02:00Z"}
]
for record in data_records:
key = str(record["user_id"])
value = json.dumps(record)
producer.produce(
"user-events",
key=key,
value=value,
headers={"content-type": "application/json"}
)
producer.flush()
produce_json_task = ProduceToTopicOperator(
task_id="produce_json_data",
topic="user-events",
producer_function=json_producer,
kafka_config_id="kafka_default"
)from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
def parameterized_producer(producer, batch_size, message_prefix):
"""Produce messages with parameters."""
for i in range(batch_size):
message = f"{message_prefix}-{i:04d}"
producer.produce("batch-topic", value=message)
producer.flush()
print(f"Produced {batch_size} messages with prefix '{message_prefix}'")
produce_batch_task = ProduceToTopicOperator(
task_id="produce_batch",
topic="batch-topic",
producer_function=parameterized_producer,
producer_function_args=[100, "BATCH"], # batch_size=100, prefix="BATCH"
kafka_config_id="kafka_default"
)from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
def custom_delivery_callback(err, msg):
"""Custom callback for delivery confirmation."""
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()}[{msg.partition()}] at offset {msg.offset()}")
def producer_with_callback(producer):
"""Producer using custom delivery callback."""
for i in range(5):
producer.produce(
"callback-topic",
value=f"Message {i}",
callback=custom_delivery_callback
)
# Poll for delivery reports
producer.poll(1.0)
producer.flush()
produce_with_callback = ProduceToTopicOperator(
task_id="produce_with_callback",
topic="callback-topic",
producer_function=producer_with_callback,
delivery_callback="custom_delivery_callback",
kafka_config_id="kafka_default"
)from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
def async_producer(producer):
"""Asynchronous producer without waiting for confirmations."""
for i in range(1000):
producer.produce(
"high-volume-topic",
value=f"Message {i}",
key=str(i % 10) # Distribute across partitions
)
# Occasional polling to clear delivery reports
if i % 100 == 0:
producer.poll(0)
producer.flush()
async_produce_task = ProduceToTopicOperator(
task_id="async_produce",
topic="high-volume-topic",
producer_function=async_producer,
synchronous=False, # Don't wait for individual confirmations
kafka_config_id="kafka_default"
)from airflow.operators.python import PythonOperator
from airflow.providers.apache.kafka.hooks.produce import KafkaProducerHook
def custom_producer_logic():
"""Use producer hook for advanced scenarios."""
hook = KafkaProducerHook(kafka_config_id="kafka_default")
producer = hook.get_producer()
try:
# Custom production logic
for i in range(10):
future = producer.produce(
"custom-topic",
value=f"Custom message {i}",
partition=i % 3 # Explicit partition assignment
)
# Wait for all messages
producer.flush(timeout=10)
except Exception as e:
print(f"Production failed: {e}")
raise
finally:
producer.close()
hook_task = PythonOperator(
task_id="custom_producer",
python_callable=custom_producer_logic
)# Connection extra configuration for optimized production
{
"bootstrap.servers": "kafka:9092",
"acks": "all", # Wait for all replicas
"retries": "2147483647", # Retry indefinitely
"batch.size": "16384", # Batch size in bytes
"linger.ms": "5", # Wait up to 5ms for batching
"compression.type": "snappy", # Enable compression
"max.in.flight.requests.per.connection": "5",
"enable.idempotence": "true" # Exactly-once semantics
}def robust_producer(producer):
"""Producer with comprehensive error handling."""
messages_sent = 0
failed_messages = []
def delivery_report(err, msg):
nonlocal messages_sent, failed_messages
if err is not None:
failed_messages.append((msg.key(), msg.value(), str(err)))
else:
messages_sent += 1
try:
for i in range(100):
producer.produce(
"robust-topic",
key=str(i),
value=f"Message {i}",
callback=delivery_report
)
# Poll periodically
if i % 10 == 0:
producer.poll(0.1)
# Final flush with timeout
remaining = producer.flush(timeout=30)
if remaining > 0:
raise Exception(f"{remaining} messages failed to deliver")
print(f"Successfully sent {messages_sent} messages")
if failed_messages:
print(f"Failed messages: {failed_messages}")
except Exception as e:
print(f"Producer error: {e}")
raise
robust_task = ProduceToTopicOperator(
task_id="robust_producer",
topic="robust-topic",
producer_function=robust_producer,
kafka_config_id="kafka_default"
)linger.ms and batch.size for optimal throughputproducer.flush() to ensure message deliveryInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-apache-kafka