Apache Kafka integration provider for Apache Airflow enabling workflows with Kafka message queues and streaming data platforms
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Base connection functionality providing authentication, configuration management, and Google Cloud Managed Kafka integration with automatic token generation.
Provides the foundational connection management for all Kafka operations with support for multiple authentication mechanisms and connection testing.
class KafkaBaseHook(BaseHook):
"""
A base hook for interacting with Apache Kafka.
Attributes:
conn_name_attr: str = "kafka_config_id"
default_conn_name: str = "kafka_default"
conn_type: str = "kafka"
hook_name: str = "Apache Kafka"
"""
def __init__(self, kafka_config_id: str = "kafka_default", *args, **kwargs) -> None:
"""
Initialize the Kafka base hook.
Args:
kafka_config_id: The connection object to use
*args: Additional positional arguments
**kwargs: Additional keyword arguments
"""
def get_conn(self) -> Any:
"""
Get Kafka AdminClient connection.
Returns:
AdminClient: Configured Kafka AdminClient instance
"""
def test_connection(self) -> tuple[bool, str]:
"""
Test the Kafka connection.
Returns:
tuple: (success: bool, message: str)
"""
def _get_client(self, config) -> Any:
"""
Get a Kafka client with the given configuration.
Args:
config: Kafka client configuration dictionary
Returns:
Any: Kafka client instance (AdminClient by default)
"""
@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""
Return custom field behaviour for Airflow UI.
Returns:
dict: UI field configuration
"""The hook supports various Kafka connection configurations through Airflow connections:
# Connection extra field example
{
"bootstrap.servers": "localhost:9092",
"security.protocol": "PLAINTEXT", # or SASL_PLAINTEXT, SASL_SSL, SSL
"client.id": "airflow-client"
}# SASL/PLAIN configuration
{
"bootstrap.servers": "broker:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.username": "username",
"sasl.password": "password"
}The provider includes automatic support for Google Cloud Managed Kafka with token-based authentication:
# Google Cloud configuration (tokens handled automatically)
{
"bootstrap.servers": "your-cluster.kafka.gcp.example.com:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER"
}from airflow.providers.apache.kafka.hooks.base import KafkaBaseHook
# Initialize with default connection
hook = KafkaBaseHook()
# Initialize with custom connection
hook = KafkaBaseHook(kafka_config_id="my_kafka_conn")
# Test the connection
success, message = hook.test_connection()
if success:
print("Connection successful")
else:
print(f"Connection failed: {message}")from airflow.providers.apache.kafka.hooks.base import KafkaBaseHook
hook = KafkaBaseHook(kafka_config_id="kafka_default")
# Get the underlying AdminClient
admin_client = hook.get_conn()
# Use for administrative operations
metadata = admin_client.list_topics()
print(f"Available topics: {list(metadata.topics.keys())}")Connection-related errors are handled through standard Kafka client exceptions. The test_connection() method provides a safe way to verify connectivity:
hook = KafkaBaseHook(kafka_config_id="kafka_connection")
success, error_message = hook.test_connection()
if not success:
print(f"Connection test failed: {error_message}")
# Handle connection failure{
"bootstrap.servers": "localhost:9092",
"security.protocol": "PLAINTEXT"
}{
"bootstrap.servers": "kafka-cluster:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.username": "{{ var.kafka_username }}",
"sasl.password": "{{ var.kafka_password }}",
"ssl.ca.location": "/path/to/ca-cert",
"client.id": "airflow-prod"
}Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-apache-kafka