Apache Kafka integration provider for Apache Airflow enabling workflows with Kafka message queues and streaming data platforms
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Message consumption capabilities with batch and individual processing, configurable commit strategies, and message filtering functions.
Airflow operator for consuming messages from Kafka topics with flexible processing functions and configurable commit strategies.
class ConsumeFromTopicOperator(BaseOperator):
"""
Operator for consuming messages from Kafka topics.
Attributes:
template_fields: tuple = ("topics", "apply_function_args", "apply_function_kwargs", "kafka_config_id")
"""
def __init__(
self,
topics: str | Sequence[str],
kafka_config_id: str = "kafka_default",
apply_function: Callable[..., Any] | str | None = None,
apply_function_batch: Callable[..., Any] | str | None = None,
apply_function_args: Sequence[Any] | None = None,
apply_function_kwargs: dict[Any, Any] | None = None,
commit_cadence: str = "end_of_operator",
max_messages: int | None = None,
max_batch_size: int = 1000,
poll_timeout: float = 60,
**kwargs: Any
) -> None:
"""
Initialize the consumer operator.
Args:
topics: Kafka topic name(s) to consume from
kafka_config_id: Airflow connection ID for Kafka configuration
apply_function: Function to apply to each message
apply_function_batch: Function to apply to batches of messages
apply_function_args: Arguments to pass to apply function
apply_function_kwargs: Keyword arguments to pass to apply function
commit_cadence: When to commit offsets ("never", "end_of_batch", "end_of_operator")
max_messages: Maximum number of messages to consume
max_batch_size: Maximum messages per batch
poll_timeout: Timeout for polling messages (seconds)
**kwargs: Additional operator arguments
"""
def execute(self, context) -> Any:
"""
Execute the consumer operation.
Args:
context: Airflow task context
Returns:
Any: Result of apply function(s)
"""Lower-level hook providing direct access to Kafka Consumer client for advanced use cases.
class KafkaConsumerHook(KafkaBaseHook):
"""
A hook for consuming messages from Kafka topics.
Inherits from KafkaBaseHook and provides consumer client access.
"""
def __init__(self, topics: Sequence[str], kafka_config_id: str = "kafka_default") -> None:
"""
Initialize the Kafka consumer hook.
Args:
topics: List of topic names to subscribe to
kafka_config_id: The connection object to use
"""
def get_consumer(self) -> Consumer:
"""
Get Kafka Consumer client.
Returns:
Consumer: Configured confluent-kafka Consumer instance
"""
def _get_client(self, config) -> Consumer:
"""
Get a Kafka Consumer client with the given configuration.
Args:
config: Kafka client configuration dictionary
Returns:
Consumer: Configured confluent-kafka Consumer instance
"""VALID_COMMIT_CADENCE = ["never", "end_of_batch", "end_of_operator"]class KafkaAuthenticationError(Exception):
"""
Custom exception for Kafka authentication failures.
Raised when consumer authentication fails during connection
or topic subscription attempts.
"""
def error_callback(err):
"""
Default error handling callback for consumer operations.
Args:
err: Error object from confluent-kafka consumer
"""from airflow import DAG
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
from datetime import datetime
def process_message(message):
"""Process individual messages."""
key = message.key().decode('utf-8') if message.key() else None
value = message.value().decode('utf-8')
print(f"Processing message - Key: {key}, Value: {value}")
# Return True to indicate successful processing
return True
dag = DAG(
"kafka_consumer_example",
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False
)
consume_task = ConsumeFromTopicOperator(
task_id="consume_messages",
topics=["my-topic"],
apply_function=process_message,
max_messages=100,
kafka_config_id="kafka_default",
dag=dag
)from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
import json
def process_batch(messages):
"""Process messages in batches."""
processed_records = []
for message in messages:
try:
# Parse JSON message
data = json.loads(message.value().decode('utf-8'))
# Transform data
processed_record = {
"user_id": data["user_id"],
"action": data["action"],
"processed_at": datetime.now().isoformat()
}
processed_records.append(processed_record)
except json.JSONDecodeError:
print(f"Failed to parse message: {message.value()}")
continue
# Bulk insert or process
print(f"Processed batch of {len(processed_records)} records")
return processed_records
consume_batch_task = ConsumeFromTopicOperator(
task_id="consume_batch",
topics=["user-events"],
apply_function_batch=process_batch,
max_batch_size=500,
commit_cadence="end_of_batch",
kafka_config_id="kafka_default"
)from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
def multi_topic_processor(message):
"""Process messages from multiple topics."""
topic = message.topic()
value = message.value().decode('utf-8')
if topic == "orders":
print(f"Processing order: {value}")
# Handle order logic
elif topic == "payments":
print(f"Processing payment: {value}")
# Handle payment logic
elif topic == "shipments":
print(f"Processing shipment: {value}")
# Handle shipment logic
return True
multi_topic_task = ConsumeFromTopicOperator(
task_id="consume_multi_topics",
topics=["orders", "payments", "shipments"],
apply_function=multi_topic_processor,
max_messages=1000,
poll_timeout=30,
kafka_config_id="kafka_default"
)from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
def parameterized_processor(message, filter_user_id, output_format):
"""Process messages with parameters."""
try:
data = json.loads(message.value().decode('utf-8'))
# Filter by user ID if specified
if filter_user_id and data.get("user_id") != filter_user_id:
return None # Skip this message
# Format output
if output_format == "csv":
result = f"{data['user_id']},{data['action']},{data['timestamp']}"
else:
result = json.dumps(data)
print(f"Processed: {result}")
return result
except Exception as e:
print(f"Error processing message: {e}")
return None
filtered_consume_task = ConsumeFromTopicOperator(
task_id="filtered_consume",
topics=["user-events"],
apply_function=parameterized_processor,
apply_function_args=[123, "json"], # filter_user_id=123, output_format="json"
max_messages=500,
kafka_config_id="kafka_default"
)from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
def critical_processor(message):
"""Process critical messages with manual commit control."""
try:
# Critical processing logic
data = json.loads(message.value().decode('utf-8'))
# Validate data
if not all(k in data for k in ["id", "amount", "currency"]):
raise ValueError("Missing required fields")
# Process financial transaction
process_transaction(data)
print(f"Successfully processed transaction {data['id']}")
return True
except Exception as e:
print(f"Failed to process transaction: {e}")
# Don't commit this message, it will be retried
raise
critical_consume_task = ConsumeFromTopicOperator(
task_id="consume_critical",
topics=["financial-transactions"],
apply_function=critical_processor,
commit_cadence="end_of_operator", # Only commit after all messages processed
max_messages=100,
poll_timeout=60,
kafka_config_id="kafka_prod"
)from airflow.operators.python import PythonOperator
from airflow.providers.apache.kafka.hooks.consume import KafkaConsumerHook
def custom_consumer_logic():
"""Use consumer hook for advanced scenarios."""
hook = KafkaConsumerHook(
topics=["custom-topic"],
kafka_config_id="kafka_default"
)
consumer = hook.get_consumer()
messages_processed = 0
try:
while messages_processed < 100:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
# Custom processing
print(f"Received: {msg.value().decode('utf-8')}")
messages_processed += 1
# Manual commit after processing
consumer.commit(msg)
except Exception as e:
print(f"Consumer error: {e}")
raise
finally:
consumer.close()
hook_consumer_task = PythonOperator(
task_id="custom_consumer",
python_callable=custom_consumer_logic
)# Connection extra configuration for consumer groups
{
"bootstrap.servers": "kafka:9092",
"group.id": "airflow-consumer-group",
"auto.offset.reset": "earliest", # or "latest"
"enable.auto.commit": "false", # Manual commit control
"max.poll.records": "500", # Messages per poll
"session.timeout.ms": "30000", # Consumer group session timeout
"heartbeat.interval.ms": "3000", # Heartbeat interval
"fetch.min.bytes": "1024", # Minimum bytes to fetch
"fetch.max.wait.ms": "500" # Maximum wait time for fetch
}def filter_and_transform(message):
"""Filter and transform messages based on content."""
try:
data = json.loads(message.value().decode('utf-8'))
# Filter based on message content
if data.get("event_type") not in ["purchase", "login"]:
return None # Skip message
# Transform message
transformed = {
"event_id": data["id"],
"user": data["user_id"],
"type": data["event_type"],
"timestamp": data["timestamp"],
"metadata": {
"topic": message.topic(),
"partition": message.partition(),
"offset": message.offset()
}
}
return transformed
except Exception as e:
print(f"Error filtering message: {e}")
return None
filter_task = ConsumeFromTopicOperator(
task_id="filter_messages",
topics=["raw-events"],
apply_function=filter_and_transform,
commit_cadence="end_of_batch",
max_batch_size=100,
kafka_config_id="kafka_default"
)from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
def resilient_processor(message):
"""Process messages with retry logic."""
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
# Processing logic that might fail
data = json.loads(message.value().decode('utf-8'))
# Simulate external API call
result = call_external_api(data)
print(f"Successfully processed message: {result}")
return result
except Exception as e:
retry_count += 1
print(f"Attempt {retry_count} failed: {e}")
if retry_count >= max_retries:
# Log to dead letter topic or error handling
print(f"Message failed after {max_retries} attempts")
return None
# Wait before retry
time.sleep(2 ** retry_count) # Exponential backoff
resilient_task = ConsumeFromTopicOperator(
task_id="resilient_consume",
topics=["api-requests"],
apply_function=resilient_processor,
commit_cadence="end_of_operator",
kafka_config_id="kafka_default"
)def high_throughput_batch_processor(messages):
"""Optimized batch processing for high throughput."""
# Process in chunks for memory efficiency
chunk_size = 100
results = []
for i in range(0, len(messages), chunk_size):
chunk = messages[i:i + chunk_size]
# Parallel processing of chunk
chunk_results = []
for message in chunk:
try:
data = json.loads(message.value().decode('utf-8'))
# Fast processing logic
chunk_results.append(transform_data(data))
except Exception as e:
print(f"Error in chunk processing: {e}")
continue
results.extend(chunk_results)
# Optional: Intermediate processing/storage
if len(results) >= 1000:
bulk_store(results)
results = []
# Final storage
if results:
bulk_store(results)
return len(messages)
high_throughput_task = ConsumeFromTopicOperator(
task_id="high_throughput_consume",
topics=["high-volume-topic"],
apply_function_batch=high_throughput_batch_processor,
max_batch_size=2000,
poll_timeout=10,
commit_cadence="end_of_batch",
kafka_config_id="kafka_default"
)auto.offset.reset and commit strategiesmax_messages and poll_timeout to control consumption rateInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-apache-kafka