0
# Administrative Operations
1
2
Administrative client for managing Kafka cluster resources including topics, consumer groups, configurations, and access control lists (ACLs). Provides comprehensive cluster management capabilities.
3
4
## Capabilities
5
6
### KafkaAdminClient
7
8
Main administrative client for cluster management operations.
9
10
```python { .api }
11
class KafkaAdminClient:
12
def __init__(self, **configs):
13
"""
14
Create a KafkaAdminClient instance.
15
16
Args:
17
**configs: Admin client configuration options including:
18
bootstrap_servers (list): List of Kafka brokers
19
client_id (str): Client identifier
20
request_timeout_ms (int): Request timeout
21
connections_max_idle_ms (int): Connection idle timeout
22
retry_backoff_ms (int): Retry backoff time
23
security_protocol (str): Security protocol
24
ssl_context: SSL context
25
sasl_mechanism (str): SASL mechanism
26
sasl_plain_username (str): SASL username
27
sasl_plain_password (str): SASL password
28
"""
29
30
def create_topics(self, new_topics, timeout_ms=None, validate_only=False):
31
"""
32
Create new topics.
33
34
Args:
35
new_topics (list): List of NewTopic objects
36
timeout_ms (int): Operation timeout
37
validate_only (bool): Only validate, don't create
38
39
Returns:
40
dict: Dictionary mapping topic name to CreateTopicsResponse.topic_errors
41
"""
42
43
def delete_topics(self, topics, timeout_ms=None):
44
"""
45
Delete topics.
46
47
Args:
48
topics (list): List of topic names to delete
49
timeout_ms (int): Operation timeout
50
51
Returns:
52
dict: Dictionary mapping topic name to DeleteTopicsResponse.topic_errors
53
"""
54
55
def list_topics(self, timeout_ms=None):
56
"""
57
List all topics in cluster.
58
59
Args:
60
timeout_ms (int): Operation timeout
61
62
Returns:
63
ClusterMetadata: Cluster metadata with topic information
64
"""
65
66
def describe_topics(self, topics, timeout_ms=None):
67
"""
68
Get detailed information about topics.
69
70
Args:
71
topics (list): List of topic names
72
timeout_ms (int): Operation timeout
73
74
Returns:
75
dict: Dictionary mapping topic name to TopicMetadata
76
"""
77
78
def create_partitions(self, partition_updates, timeout_ms=None, validate_only=False):
79
"""
80
Add partitions to existing topics.
81
82
Args:
83
partition_updates (dict): Dictionary mapping topic name to NewPartitions
84
timeout_ms (int): Operation timeout
85
validate_only (bool): Only validate, don't create
86
87
Returns:
88
dict: Dictionary mapping topic name to CreatePartitionsResponse.topic_errors
89
"""
90
91
def describe_configs(self, config_resources, timeout_ms=None, include_synonyms=False):
92
"""
93
Get configuration for resources.
94
95
Args:
96
config_resources (list): List of ConfigResource objects
97
timeout_ms (int): Operation timeout
98
include_synonyms (bool): Include config synonyms
99
100
Returns:
101
dict: Dictionary mapping ConfigResource to DescribeConfigsResponse.resources
102
"""
103
104
def alter_configs(self, config_updates, timeout_ms=None, validate_only=False):
105
"""
106
Alter configuration for resources.
107
108
Args:
109
config_updates (dict): Dictionary mapping ConfigResource to config changes
110
timeout_ms (int): Operation timeout
111
validate_only (bool): Only validate, don't alter
112
113
Returns:
114
dict: Dictionary mapping ConfigResource to AlterConfigsResponse.resources
115
"""
116
117
def list_consumer_groups(self, timeout_ms=None):
118
"""
119
List consumer groups in cluster.
120
121
Args:
122
timeout_ms (int): Operation timeout
123
124
Returns:
125
list: List of GroupInformation objects
126
"""
127
128
def describe_consumer_groups(self, group_ids, timeout_ms=None):
129
"""
130
Get detailed information about consumer groups.
131
132
Args:
133
group_ids (list): List of consumer group IDs
134
timeout_ms (int): Operation timeout
135
136
Returns:
137
dict: Dictionary mapping group ID to GroupInformation
138
"""
139
140
def delete_consumer_groups(self, group_ids, timeout_ms=None):
141
"""
142
Delete consumer groups.
143
144
Args:
145
group_ids (list): List of consumer group IDs to delete
146
timeout_ms (int): Operation timeout
147
148
Returns:
149
dict: Dictionary mapping group ID to delete response
150
"""
151
152
def list_consumer_group_offsets(self, group_id, partitions=None, timeout_ms=None):
153
"""
154
Get committed offsets for consumer group.
155
156
Args:
157
group_id (str): Consumer group ID
158
partitions (list): List of TopicPartition objects (None = all)
159
timeout_ms (int): Operation timeout
160
161
Returns:
162
dict: Dictionary mapping TopicPartition to OffsetAndMetadata
163
"""
164
165
def alter_consumer_group_offsets(self, group_id, offsets, timeout_ms=None):
166
"""
167
Alter committed offsets for consumer group.
168
169
Args:
170
group_id (str): Consumer group ID
171
offsets (dict): Dictionary mapping TopicPartition to OffsetAndMetadata
172
timeout_ms (int): Operation timeout
173
174
Returns:
175
dict: Dictionary mapping TopicPartition to alter response
176
"""
177
178
def create_acls(self, acls, timeout_ms=None):
179
"""
180
Create access control lists.
181
182
Args:
183
acls (list): List of ACL objects
184
timeout_ms (int): Operation timeout
185
186
Returns:
187
list: List of CreateAclsResponse.creation_responses
188
"""
189
190
def describe_acls(self, acl_filter, timeout_ms=None):
191
"""
192
Describe access control lists matching filter.
193
194
Args:
195
acl_filter (ACLFilter): Filter for ACLs to describe
196
timeout_ms (int): Operation timeout
197
198
Returns:
199
list: List of ACL objects matching filter
200
"""
201
202
def delete_acls(self, acl_filters, timeout_ms=None):
203
"""
204
Delete access control lists matching filters.
205
206
Args:
207
acl_filters (list): List of ACLFilter objects
208
timeout_ms (int): Operation timeout
209
210
Returns:
211
list: List of DeleteAclsResponse.filter_responses
212
"""
213
214
def describe_cluster(self, timeout_ms=None):
215
"""
216
Get cluster metadata including brokers and cluster ID.
217
218
Args:
219
timeout_ms (int): Operation timeout
220
221
Returns:
222
ClusterMetadata: Cluster information including brokers and cluster ID
223
"""
224
225
def describe_log_dirs(self, broker_ids=None, timeout_ms=None):
226
"""
227
Describe log directories on brokers.
228
229
Args:
230
broker_ids (list): List of broker IDs (None = all brokers)
231
timeout_ms (int): Operation timeout
232
233
Returns:
234
dict: Dictionary mapping broker ID to log directory information
235
"""
236
237
def close(self):
238
"""Close the admin client and release resources."""
239
```
240
241
### Topic Management Types
242
243
Types for creating and managing topics.
244
245
```python { .api }
246
class NewTopic:
247
def __init__(self, name, num_partitions, replication_factor, replica_assignments=None, topic_configs=None):
248
"""
249
Specification for creating a new topic.
250
251
Args:
252
name (str): Topic name
253
num_partitions (int): Number of partitions
254
replication_factor (int): Replication factor
255
replica_assignments (dict): Custom replica assignments (optional)
256
topic_configs (dict): Topic configuration properties (optional)
257
"""
258
259
name: str
260
num_partitions: int
261
replication_factor: int
262
replica_assignments: dict
263
topic_configs: dict
264
265
class NewPartitions:
266
def __init__(self, total_count, new_assignments=None):
267
"""
268
Specification for adding partitions to existing topic.
269
270
Args:
271
total_count (int): New total partition count
272
new_assignments (list): Replica assignments for new partitions (optional)
273
"""
274
275
total_count: int
276
new_assignments: list
277
```
278
279
### Configuration Management Types
280
281
Types for managing resource configurations.
282
283
```python { .api }
284
class ConfigResource:
285
def __init__(self, resource_type, name, configs=None):
286
"""
287
Resource for configuration operations.
288
289
Args:
290
resource_type (ConfigResourceType): Type of resource
291
name (str): Resource name
292
configs (dict): Configuration properties (optional)
293
"""
294
295
resource_type: ConfigResourceType
296
name: str
297
configs: dict
298
299
class ConfigResourceType:
300
BROKER = 4 # Broker configuration
301
TOPIC = 2 # Topic configuration
302
```
303
304
### Access Control Types
305
306
Types for managing access control lists (ACLs).
307
308
```python { .api }
309
class ACL:
310
def __init__(self, principal, host, operation, permission_type, resource_pattern):
311
"""
312
Access control list entry.
313
314
Args:
315
principal (str): Principal (user/service account)
316
host (str): Host pattern
317
operation (ACLOperation): Operation type
318
permission_type (ACLPermissionType): Permission type
319
resource_pattern (ResourcePattern): Resource pattern
320
"""
321
322
principal: str
323
host: str
324
operation: ACLOperation
325
permission_type: ACLPermissionType
326
resource_pattern: ResourcePattern
327
328
class ACLFilter:
329
def __init__(self, principal=None, host=None, operation=None, permission_type=None, resource_pattern=None):
330
"""Filter for ACL operations."""
331
332
class ResourcePattern:
333
def __init__(self, resource_type, resource_name, pattern_type):
334
"""
335
Resource pattern for ACL matching.
336
337
Args:
338
resource_type (ResourceType): Type of resource
339
resource_name (str): Resource name/pattern
340
pattern_type (ACLResourcePatternType): Pattern matching type
341
"""
342
343
resource_type: ResourceType
344
resource_name: str
345
pattern_type: ACLResourcePatternType
346
347
class ACLOperation:
348
ANY = 1
349
ALL = 2
350
READ = 3
351
WRITE = 4
352
CREATE = 5
353
DELETE = 6
354
ALTER = 7
355
DESCRIBE = 8
356
CLUSTER_ACTION = 9
357
DESCRIBE_CONFIGS = 10
358
ALTER_CONFIGS = 11
359
IDEMPOTENT_WRITE = 12
360
361
class ACLPermissionType:
362
ANY = 1
363
DENY = 2
364
ALLOW = 3
365
366
class ResourceType:
367
UNKNOWN = 0
368
ANY = 1
369
CLUSTER = 4
370
DELEGATION_TOKEN = 6
371
GROUP = 3
372
TOPIC = 2
373
TRANSACTIONAL_ID = 5
374
375
class ACLResourcePatternType:
376
ANY = 1
377
MATCH = 2
378
LITERAL = 3
379
PREFIXED = 4
380
```
381
382
## Usage Examples
383
384
### Topic Management
385
386
```python
387
from kafka import KafkaAdminClient
388
from kafka.admin import NewTopic, NewPartitions
389
390
# Create admin client
391
admin = KafkaAdminClient(
392
bootstrap_servers=['localhost:9092'],
393
client_id='admin-client'
394
)
395
396
# Create topics
397
topics = [
398
NewTopic(
399
name='user-events',
400
num_partitions=6,
401
replication_factor=3,
402
topic_configs={
403
'cleanup.policy': 'compact',
404
'retention.ms': '604800000' # 7 days
405
}
406
),
407
NewTopic(
408
name='analytics',
409
num_partitions=12,
410
replication_factor=2
411
)
412
]
413
414
result = admin.create_topics(topics, timeout_ms=30000)
415
for topic, error in result.values():
416
if error is None:
417
print(f"Topic {topic} created successfully")
418
else:
419
print(f"Failed to create topic {topic}: {error}")
420
421
# Add partitions to existing topic
422
partition_updates = {
423
'user-events': NewPartitions(total_count=10)
424
}
425
admin.create_partitions(partition_updates)
426
427
# Delete topics
428
admin.delete_topics(['old-topic'], timeout_ms=30000)
429
430
admin.close()
431
```
432
433
### Configuration Management
434
435
```python
436
from kafka import KafkaAdminClient
437
from kafka.admin import ConfigResource, ConfigResourceType
438
439
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
440
441
# Get topic configuration
442
topic_resource = ConfigResource(ConfigResourceType.TOPIC, 'my-topic')
443
config_result = admin.describe_configs([topic_resource])
444
445
for resource, config_response in config_result.items():
446
print(f"Configuration for {resource.name}:")
447
for config in config_response.configs:
448
print(f" {config.name} = {config.value}")
449
450
# Alter topic configuration
451
config_updates = {
452
topic_resource: {
453
'retention.ms': '86400000', # 1 day
454
'segment.ms': '3600000' # 1 hour
455
}
456
}
457
admin.alter_configs(config_updates)
458
459
admin.close()
460
```
461
462
### Consumer Group Management
463
464
```python
465
from kafka import KafkaAdminClient
466
from kafka.structs import TopicPartition, OffsetAndMetadata
467
468
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
469
470
# List all consumer groups
471
groups = admin.list_consumer_groups()
472
for group in groups:
473
print(f"Group: {group.group}, State: {group.state}")
474
475
# Get detailed group information
476
group_details = admin.describe_consumer_groups(['my-consumer-group'])
477
for group_id, group_info in group_details.items():
478
print(f"Group {group_id}:")
479
print(f" State: {group_info.state}")
480
print(f" Protocol: {group_info.protocol}")
481
print(f" Members: {len(group_info.members)}")
482
483
# Get committed offsets
484
group_id = 'my-consumer-group'
485
offsets = admin.list_consumer_group_offsets(group_id)
486
for partition, offset_metadata in offsets.items():
487
print(f"{partition.topic}:{partition.partition} = {offset_metadata.offset}")
488
489
# Reset offsets
490
new_offsets = {
491
TopicPartition('my-topic', 0): OffsetAndMetadata(1000, 'reset'),
492
TopicPartition('my-topic', 1): OffsetAndMetadata(2000, 'reset')
493
}
494
admin.alter_consumer_group_offsets(group_id, new_offsets)
495
496
admin.close()
497
```
498
499
### Access Control Management
500
501
```python
502
from kafka import KafkaAdminClient
503
from kafka.admin import (ACL, ACLFilter, ResourcePattern,
504
ACLOperation, ACLPermissionType,
505
ResourceType, ACLResourcePatternType)
506
507
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
508
509
# Create ACLs
510
acls = [
511
ACL(
512
principal='User:alice',
513
host='*',
514
operation=ACLOperation.READ,
515
permission_type=ACLPermissionType.ALLOW,
516
resource_pattern=ResourcePattern(
517
resource_type=ResourceType.TOPIC,
518
resource_name='user-data',
519
pattern_type=ACLResourcePatternType.LITERAL
520
)
521
),
522
ACL(
523
principal='User:service-account',
524
host='*',
525
operation=ACLOperation.WRITE,
526
permission_type=ACLPermissionType.ALLOW,
527
resource_pattern=ResourcePattern(
528
resource_type=ResourceType.TOPIC,
529
resource_name='events-',
530
pattern_type=ACLResourcePatternType.PREFIXED
531
)
532
)
533
]
534
535
admin.create_acls(acls)
536
537
# List ACLs
538
acl_filter = ACLFilter(
539
resource_pattern=ResourcePatternFilter(
540
resource_type=ResourceType.TOPIC,
541
resource_name=None, # All topics
542
pattern_type=ACLResourcePatternType.ANY
543
)
544
)
545
existing_acls = admin.describe_acls(acl_filter)
546
for acl in existing_acls:
547
print(f"ACL: {acl.principal} {acl.permission_type} {acl.operation} on {acl.resource_pattern}")
548
549
admin.close()
550
```