0
# Administrative Operations
1
2
Administrative capabilities for managing Kafka clusters including topic creation and deletion with partition and replication configuration.
3
4
## Capabilities
5
6
### Kafka Admin Client Hook
7
8
Provides administrative operations for Kafka clusters using the Kafka AdminClient API. Extends the base hook with cluster management capabilities.
9
10
```python { .api }
11
class KafkaAdminClientHook(KafkaBaseHook):
12
"""
13
A hook for administrative Kafka operations.
14
15
Inherits from KafkaBaseHook and provides methods for topic management
16
and other administrative tasks.
17
"""
18
19
def __init__(self, kafka_config_id: str = "kafka_default") -> None:
20
"""
21
Initialize the Kafka admin client hook.
22
23
Args:
24
kafka_config_id: The connection object to use
25
"""
26
27
def create_topic(self, topics: Sequence[Sequence[Any]]) -> None:
28
"""
29
Create Kafka topics.
30
31
Args:
32
topics: Sequence of topic configurations, where each topic is a sequence
33
containing [topic_name, partition_count, replication_factor]
34
35
Raises:
36
KafkaException: If topic creation fails
37
"""
38
39
def delete_topic(self, topics: Sequence[str]) -> None:
40
"""
41
Delete Kafka topics.
42
43
Args:
44
topics: Sequence of topic names to delete
45
46
Raises:
47
KafkaException: If topic deletion fails
48
"""
49
50
def _get_client(self, config) -> AdminClient:
51
"""
52
Get a Kafka AdminClient with the given configuration.
53
54
Args:
55
config: Kafka client configuration dictionary
56
57
Returns:
58
AdminClient: Configured confluent-kafka AdminClient instance
59
"""
60
```
61
62
### Usage Examples
63
64
#### Creating Topics
65
66
```python
67
from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook
68
69
admin_hook = KafkaAdminClientHook(kafka_config_id="kafka_default")
70
71
# Create a single topic with default settings
72
topics_to_create = [
73
["my-topic", 3, 1] # topic_name, partitions, replication_factor
74
]
75
76
admin_hook.create_topic(topics_to_create)
77
```
78
79
#### Creating Topics with Configuration
80
81
```python
82
from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook
83
84
admin_hook = KafkaAdminClientHook()
85
86
# Create topics with standard configuration
87
topics_config = [
88
["high-throughput-topic", 12, 3], # topic_name, partitions, replication_factor
89
["log-compaction-topic", 6, 2]
90
]
91
92
admin_hook.create_topic(topics_config)
93
```
94
95
#### Deleting Topics
96
97
```python
98
from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook
99
100
admin_hook = KafkaAdminClientHook()
101
102
# Delete single topic
103
admin_hook.delete_topic(["old-topic"])
104
105
# Delete multiple topics
106
topics_to_delete = [
107
"temp-topic-1",
108
"temp-topic-2",
109
"test-topic"
110
]
111
admin_hook.delete_topic(topics_to_delete)
112
```
113
114
### Administrative DAG Example
115
116
```python
117
from datetime import datetime
118
from airflow import DAG
119
from airflow.operators.python import PythonOperator
120
from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook
121
122
def create_kafka_topics():
123
"""Create required Kafka topics for the data pipeline."""
124
admin_hook = KafkaAdminClientHook(kafka_config_id="kafka_prod")
125
126
topics = [
127
["user-events", 12, 3], # topic, partitions, replication_factor
128
["processed-events", 6, 2],
129
["error-events", 3, 2]
130
]
131
132
admin_hook.create_topic(topics)
133
return "Topics created successfully"
134
135
def cleanup_old_topics():
136
"""Remove temporary or test topics."""
137
admin_hook = KafkaAdminClientHook(kafka_config_id="kafka_prod")
138
139
topics_to_remove = [
140
"test-topic-2023",
141
"temp-processing-topic",
142
"old-user-data"
143
]
144
145
admin_hook.delete_topic(topics_to_remove)
146
return "Old topics cleaned up"
147
148
dag = DAG(
149
"kafka_admin_operations",
150
start_date=datetime(2023, 1, 1),
151
schedule_interval=None,
152
catchup=False,
153
description="Kafka administrative operations"
154
)
155
156
create_topics_task = PythonOperator(
157
task_id="create_topics",
158
python_callable=create_kafka_topics,
159
dag=dag
160
)
161
162
cleanup_topics_task = PythonOperator(
163
task_id="cleanup_topics",
164
python_callable=cleanup_old_topics,
165
dag=dag
166
)
167
168
create_topics_task >> cleanup_topics_task
169
```
170
171
### Topic Configuration Options
172
173
Common topic configuration parameters that can be specified in the config dictionary:
174
175
#### Retention Settings
176
177
```python
178
{
179
"retention.ms": "604800000", # 7 days in milliseconds
180
"retention.bytes": "1073741824", # 1GB per partition
181
"cleanup.policy": "delete" # or "compact" for log compaction
182
}
183
```
184
185
#### Performance Settings
186
187
```python
188
{
189
"segment.ms": "86400000", # 1 day segment roll time
190
"max.message.bytes": "1000000", # 1MB max message size
191
"compression.type": "snappy", # or "gzip", "lz4", "zstd"
192
"min.insync.replicas": "2" # Minimum in-sync replicas
193
}
194
```
195
196
#### Log Compaction Settings
197
198
```python
199
{
200
"cleanup.policy": "compact",
201
"min.cleanable.dirty.ratio": "0.5",
202
"delete.retention.ms": "86400000", # Tombstone retention
203
"min.compaction.lag.ms": "0"
204
}
205
```
206
207
### Error Handling
208
209
Administrative operations may fail due to various reasons. Handle exceptions appropriately:
210
211
```python
212
from confluent_kafka import KafkaException
213
from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook
214
215
def safe_topic_creation():
216
admin_hook = KafkaAdminClientHook()
217
218
try:
219
topics = [["new-topic", 3, 1, {}]]
220
admin_hook.create_topic(topics)
221
print("Topic created successfully")
222
except KafkaException as e:
223
if "already exists" in str(e):
224
print("Topic already exists, continuing...")
225
else:
226
print(f"Failed to create topic: {e}")
227
raise
228
except Exception as e:
229
print(f"Unexpected error: {e}")
230
raise
231
```
232
233
### Best Practices
234
235
1. **Partition Planning**: Choose partition count based on expected throughput and consumer parallelism
236
2. **Replication Factor**: Use at least 2 for important topics, 3 for critical data
237
3. **Retention Policy**: Set appropriate retention based on data lifecycle requirements
238
4. **Error Handling**: Always handle topic creation/deletion failures gracefully
239
5. **Monitoring**: Log administrative operations for audit and troubleshooting purposes