Apache Kafka integration provider for Apache Airflow enabling workflows with Kafka message queues and streaming data platforms
npx @tessl/cli install tessl/pypi-apache-airflow-providers-apache-kafka@1.10.00
# Apache Airflow Kafka Provider
1
2
Apache Kafka integration provider for Apache Airflow, enabling data engineers to build workflows that interact with Kafka message queues and streaming data platforms. This provider offers a comprehensive set of operators for producing and consuming messages, hooks for low-level Kafka client operations, sensors for monitoring message availability, and triggers for asynchronous message processing.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-apache-kafka
7
- **Package Type**: pip
8
- **Language**: Python
9
- **Installation**: `pip install apache-airflow-providers-apache-kafka`
10
- **Minimum Airflow Version**: 2.10.0+
11
- **Dependencies**: confluent-kafka>=2.6.0, asgiref>=2.3.0
12
13
## Core Imports
14
15
```python
16
# Basic imports for common usage
17
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
18
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
19
from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageSensor
20
```
21
22
Hook imports for advanced usage:
23
24
```python
25
from airflow.providers.apache.kafka.hooks.base import KafkaBaseHook
26
from airflow.providers.apache.kafka.hooks.consume import KafkaConsumerHook
27
from airflow.providers.apache.kafka.hooks.produce import KafkaProducerHook
28
from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook
29
```
30
31
Sensor and trigger imports:
32
33
```python
34
from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageSensor, AwaitMessageTriggerFunctionSensor
35
from airflow.providers.apache.kafka.triggers.await_message import AwaitMessageTrigger
36
```
37
38
## Basic Usage
39
40
```python
41
from datetime import datetime
42
from airflow import DAG
43
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
44
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
45
46
# Simple producer function
47
def produce_messages(producer):
48
for i in range(10):
49
producer.produce("my-topic", value=f"Message {i}")
50
producer.flush()
51
52
# Simple consumer processing function
53
def process_message(message):
54
print(f"Processing: {message.value().decode('utf-8')}")
55
return True
56
57
dag = DAG(
58
"kafka_example",
59
start_date=datetime(2023, 1, 1),
60
schedule_interval=None,
61
catchup=False
62
)
63
64
# Produce messages to Kafka topic
65
produce_task = ProduceToTopicOperator(
66
task_id="produce_messages",
67
topic="my-topic",
68
producer_function=produce_messages,
69
kafka_config_id="kafka_default",
70
dag=dag
71
)
72
73
# Consume messages from Kafka topic
74
consume_task = ConsumeFromTopicOperator(
75
task_id="consume_messages",
76
topics=["my-topic"],
77
apply_function=process_message,
78
max_messages=10,
79
kafka_config_id="kafka_default",
80
dag=dag
81
)
82
83
produce_task >> consume_task
84
```
85
86
## Architecture
87
88
The provider is organized into five main capability areas:
89
90
- **Connection Management**: Centralized Kafka connection configuration and authentication
91
- **Administrative Operations**: Topic management and cluster administration
92
- **Message Production**: Publishing messages to Kafka topics with flexible delivery options
93
- **Message Consumption**: Consuming and processing messages with configurable commit strategies
94
- **Asynchronous Operations**: Event-driven sensors and triggers for real-time workflows
95
96
## Capabilities
97
98
### Connection Management
99
100
Base connection functionality providing authentication, configuration management, and Google Cloud Managed Kafka integration with automatic token generation.
101
102
```python { .api }
103
class KafkaBaseHook(BaseHook):
104
def __init__(self, kafka_config_id: str = "kafka_default") -> None: ...
105
def get_conn(self) -> Any: ...
106
def test_connection(self) -> tuple[bool, str]: ...
107
```
108
109
[Connection Management](./connection-management.md)
110
111
### Administrative Operations
112
113
Administrative capabilities for managing Kafka clusters including topic creation and deletion with partition and replication configuration.
114
115
```python { .api }
116
class KafkaAdminClientHook(KafkaBaseHook):
117
def create_topic(self, topics: Sequence[Sequence[Any]]) -> None: ...
118
def delete_topic(self, topics: Sequence[str]) -> None: ...
119
```
120
121
[Administrative Operations](./admin-operations.md)
122
123
### Message Production
124
125
Message publishing capabilities with support for synchronous and asynchronous operations, custom delivery callbacks, and templated producer functions.
126
127
```python { .api }
128
class ProduceToTopicOperator(BaseOperator):
129
def __init__(self, topic: str, producer_function: str | Callable, kafka_config_id: str = "kafka_default", **kwargs) -> None: ...
130
131
class KafkaProducerHook(KafkaBaseHook):
132
def get_producer(self) -> Producer: ...
133
```
134
135
[Message Production](./message-production.md)
136
137
### Message Consumption
138
139
Message consumption capabilities with batch and individual processing, configurable commit strategies, and message filtering functions.
140
141
```python { .api }
142
class ConsumeFromTopicOperator(BaseOperator):
143
def __init__(self, topics: str | Sequence[str], kafka_config_id: str = "kafka_default", **kwargs) -> None: ...
144
145
class KafkaConsumerHook(KafkaBaseHook):
146
def get_consumer(self) -> Consumer: ...
147
```
148
149
[Message Consumption](./message-consumption.md)
150
151
### Asynchronous Operations
152
153
Deferrable sensors and triggers for event-driven processing, enabling non-blocking message monitoring in Kafka topics.
154
155
```python { .api }
156
class AwaitMessageSensor(BaseOperator):
157
def __init__(self, topics: Sequence[str], apply_function: str | Callable, kafka_config_id: str = "kafka_default", **kwargs) -> None: ...
158
159
class AwaitMessageTriggerFunctionSensor(BaseOperator):
160
def __init__(self, topics: Sequence[str], apply_function: str | Callable, event_triggered_function: Callable, **kwargs) -> None: ...
161
162
class AwaitMessageTrigger(BaseTrigger):
163
def __init__(self, topics: Sequence[str], apply_function: str, kafka_config_id: str = "kafka_default", **kwargs) -> None: ...
164
```
165
166
[Asynchronous Operations](./async-operations.md)
167
168
## Configuration
169
170
The provider uses Airflow connections for configuration. Set up a connection with:
171
172
- **Connection Type**: kafka
173
- **Connection ID**: kafka_default (or custom)
174
- **Extra**: JSON configuration with `bootstrap.servers` and other Kafka client settings
175
176
Example connection configuration:
177
178
```json
179
{
180
"bootstrap.servers": "localhost:9092",
181
"security.protocol": "SASL_SSL",
182
"sasl.mechanism": "PLAIN",
183
"sasl.username": "your-username",
184
"sasl.password": "your-password"
185
}
186
```
187
188
## Error Handling
189
190
The provider includes specialized exception handling:
191
192
```python { .api }
193
class KafkaAuthenticationError(Exception):
194
"""Custom exception for Kafka authentication failures."""
195
```
196
197
## Version Compatibility
198
199
The provider includes version compatibility helpers for different Airflow versions:
200
201
```python { .api }
202
def get_base_airflow_version_tuple() -> tuple[int, int, int]:
203
"""
204
Get the base Airflow version as a tuple.
205
206
Returns:
207
tuple: (major, minor, patch) version numbers
208
"""
209
210
# Version flags for conditional imports and behavior
211
AIRFLOW_V_3_0_PLUS: bool # True if Airflow 3.0+
212
AIRFLOW_V_3_1_PLUS: bool # True if Airflow 3.1+
213
214
# Version-compatible base class imports
215
BaseHook # Imports from airflow.sdk (3.1+) or airflow.hooks.base
216
BaseOperator # Imports from airflow.sdk (3.0+) or airflow.models
217
```
218
219
### Version-Specific Import Patterns
220
221
```python
222
# Recommended: Use version-compatible imports
223
from airflow.providers.apache.kafka.version_compat import BaseHook, BaseOperator
224
225
# These imports work across Airflow versions:
226
# - Airflow 3.1+: Imports from airflow.sdk.base_hook
227
# - Airflow 3.0+: Imports from airflow.models.baseoperator
228
# - Airflow 2.10+: Imports from airflow.hooks.base and airflow.models
229
```
230
231
### Version Compatibility Utilities
232
233
The provider includes utilities for handling different Airflow versions:
234
235
```python { .api }
236
def get_base_airflow_version_tuple() -> tuple[int, int, int]:
237
"""
238
Get the base Airflow version as a tuple.
239
240
Returns:
241
tuple: (major, minor, patch) version numbers
242
"""
243
244
# Version detection constants
245
AIRFLOW_V_3_0_PLUS: bool # True if Airflow 3.0 or higher
246
AIRFLOW_V_3_1_PLUS: bool # True if Airflow 3.1 or higher
247
```
248
249
These utilities enable the provider to work across multiple Airflow versions by automatically importing the correct base classes and handling version-specific differences.