0
# Administrative API
1
2
Administrative client for managing Kafka clusters including topic operations, partition management, configuration changes, access control lists, and consumer group administration.
3
4
## Capabilities
5
6
### KafkaAdminClient
7
8
Main administrative client providing comprehensive cluster management capabilities. Supports all Kafka administrative operations with proper error handling and timeouts.
9
10
```python { .api }
11
class KafkaAdminClient:
12
def __init__(self, **configs):
13
"""
14
Initialize admin client.
15
16
Configuration Parameters:
17
- bootstrap_servers: List[str], broker addresses
18
- client_id: str, client identifier
19
- connections_max_idle_ms: int, max idle time (default: 540000)
20
- request_timeout_ms: int, request timeout (default: 30000)
21
- retry_backoff_ms: int, retry backoff (default: 100)
22
- reconnect_backoff_ms: int, reconnect backoff (default: 50)
23
- security_protocol: str, 'PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'
24
- api_version: tuple, broker API version or 'auto'
25
"""
26
27
def create_topics(self, topic_requests, timeout_ms=None, validate_only=False):
28
"""
29
Create new topics.
30
31
Parameters:
32
- topic_requests: List[NewTopic], topic creation requests
33
- timeout_ms: int, operation timeout
34
- validate_only: bool, validate without creating
35
36
Returns:
37
- Dict[str, Future]: topic name to creation result future
38
"""
39
40
def delete_topics(self, topics, timeout_ms=None):
41
"""
42
Delete topics.
43
44
Parameters:
45
- topics: List[str], topic names to delete
46
- timeout_ms: int, operation timeout
47
48
Returns:
49
- Dict[str, Future]: topic name to deletion result future
50
"""
51
52
def list_topics(self, timeout_ms=None):
53
"""
54
List available topics.
55
56
Parameters:
57
- timeout_ms: int, operation timeout
58
59
Returns:
60
- Set[str]: available topic names
61
"""
62
63
def describe_topics(self, topics, timeout_ms=None):
64
"""
65
Get detailed topic information.
66
67
Parameters:
68
- topics: List[str], topic names to describe
69
- timeout_ms: int, operation timeout
70
71
Returns:
72
- Dict[str, TopicDescription]: topic descriptions
73
"""
74
75
def create_partitions(self, partition_updates, timeout_ms=None, validate_only=False):
76
"""
77
Add partitions to existing topics.
78
79
Parameters:
80
- partition_updates: Dict[str, NewPartitions], topic to partition updates
81
- timeout_ms: int, operation timeout
82
- validate_only: bool, validate without creating
83
84
Returns:
85
- Dict[str, Future]: topic name to result future
86
"""
87
88
def describe_configs(self, config_resources, timeout_ms=None):
89
"""
90
Get configuration for resources.
91
92
Parameters:
93
- config_resources: List[ConfigResource], resources to describe
94
- timeout_ms: int, operation timeout
95
96
Returns:
97
- Dict[ConfigResource, ConfigResourceResult]: configuration results
98
"""
99
100
def alter_configs(self, config_resources, timeout_ms=None):
101
"""
102
Modify configuration for resources.
103
104
Parameters:
105
- config_resources: Dict[ConfigResource, Dict[str, str]], config changes
106
- timeout_ms: int, operation timeout
107
108
Returns:
109
- Dict[ConfigResource, Future]: configuration change results
110
"""
111
112
def describe_acls(self, acl_filter, timeout_ms=None):
113
"""
114
Describe access control lists.
115
116
Parameters:
117
- acl_filter: ACLFilter, filter for ACL queries
118
- timeout_ms: int, operation timeout
119
120
Returns:
121
- List[ACLBinding]: matching ACL bindings
122
"""
123
124
def create_acls(self, acls, timeout_ms=None):
125
"""
126
Create access control lists.
127
128
Parameters:
129
- acls: List[ACL], ACLs to create
130
- timeout_ms: int, operation timeout
131
132
Returns:
133
- Dict[ACL, Future]: ACL creation results
134
"""
135
136
def delete_acls(self, acl_filters, timeout_ms=None):
137
"""
138
Delete access control lists.
139
140
Parameters:
141
- acl_filters: List[ACLFilter], filters for ACLs to delete
142
- timeout_ms: int, operation timeout
143
144
Returns:
145
- List[DeleteAclsResult]: deletion results
146
"""
147
148
def list_consumer_groups(self, timeout_ms=None):
149
"""
150
List consumer groups.
151
152
Parameters:
153
- timeout_ms: int, operation timeout
154
155
Returns:
156
- List[GroupInformation]: consumer group information
157
"""
158
159
def describe_consumer_groups(self, group_ids, timeout_ms=None):
160
"""
161
Get detailed consumer group information.
162
163
Parameters:
164
- group_ids: List[str], group IDs to describe
165
- timeout_ms: int, operation timeout
166
167
Returns:
168
- Dict[str, GroupDescription]: group descriptions
169
"""
170
171
def delete_consumer_groups(self, group_ids, timeout_ms=None):
172
"""
173
Delete consumer groups.
174
175
Parameters:
176
- group_ids: List[str], group IDs to delete
177
- timeout_ms: int, operation timeout
178
179
Returns:
180
- Dict[str, Future]: deletion results
181
"""
182
183
def close(self):
184
"""Close admin client and clean up resources."""
185
```
186
187
### Topic Management
188
189
Classes for creating and modifying topics.
190
191
```python { .api }
192
class NewTopic:
193
def __init__(self, name, num_partitions, replication_factor,
194
replica_assignments=None, topic_configs=None):
195
"""
196
Topic creation specification.
197
198
Parameters:
199
- name: str, topic name
200
- num_partitions: int, number of partitions
201
- replication_factor: int, replication factor
202
- replica_assignments: Dict[int, List[int]], manual replica assignments
203
- topic_configs: Dict[str, str], topic configuration overrides
204
"""
205
self.name = name
206
self.num_partitions = num_partitions
207
self.replication_factor = replication_factor
208
self.replica_assignments = replica_assignments or {}
209
self.topic_configs = topic_configs or {}
210
211
class NewPartitions:
212
def __init__(self, total_count, new_assignments=None):
213
"""
214
Partition addition specification.
215
216
Parameters:
217
- total_count: int, new total partition count
218
- new_assignments: List[List[int]], replica assignments for new partitions
219
"""
220
self.total_count = total_count
221
self.new_assignments = new_assignments
222
223
class TopicDescription:
224
name: str # Topic name
225
partitions: List[PartitionMetadata] # Partition metadata
226
is_internal: bool # Internal topic flag
227
authorizedOperations: List[int] # Authorized operations
228
```
229
230
### Configuration Management
231
232
Classes for managing broker and topic configurations.
233
234
```python { .api }
235
class ConfigResource:
236
def __init__(self, resource_type, name, configs=None):
237
"""
238
Configuration resource specification.
239
240
Parameters:
241
- resource_type: ConfigResourceType, resource type
242
- name: str, resource name
243
- configs: Dict[str, str], configuration key-value pairs
244
"""
245
self.resource_type = resource_type
246
self.name = name
247
self.configs = configs or {}
248
249
class ConfigResourceType:
250
BROKER = 4 # Broker configuration
251
TOPIC = 2 # Topic configuration
252
253
class ConfigResourceResult:
254
configs: Dict[str, ConfigEntry] # Configuration entries
255
error_code: int # Error code (0 = success)
256
error_message: str # Error description
257
258
class ConfigEntry:
259
name: str # Configuration key
260
value: str # Configuration value
261
is_default: bool # Is default value
262
is_sensitive: bool # Is sensitive value
263
is_read_only: bool # Is read-only
264
synonyms: List['ConfigSynonym'] # Configuration synonyms
265
```
266
267
### Access Control Lists (ACL)
268
269
Classes for managing access control and authorization.
270
271
```python { .api }
272
class ACL:
273
def __init__(self, principal, host, operation, permission_type, resource_pattern):
274
"""
275
Access control list entry.
276
277
Parameters:
278
- principal: str, principal (user/service)
279
- host: str, host pattern
280
- operation: ACLOperation, operation type
281
- permission_type: ACLPermissionType, ALLOW or DENY
282
- resource_pattern: ResourcePattern, resource pattern
283
"""
284
self.principal = principal
285
self.host = host
286
self.operation = operation
287
self.permission_type = permission_type
288
self.resource_pattern = resource_pattern
289
290
class ACLFilter:
291
def __init__(self, principal=None, host=None, operation=None,
292
permission_type=None, resource_pattern_filter=None):
293
"""
294
ACL filter for queries (allows ANY values).
295
296
Parameters:
297
- principal: str|ACLFilter.ANY, principal filter
298
- host: str|ACLFilter.ANY, host filter
299
- operation: ACLOperation|ACLFilter.ANY, operation filter
300
- permission_type: ACLPermissionType|ACLFilter.ANY, permission filter
301
- resource_pattern_filter: ResourcePatternFilter, resource filter
302
"""
303
self.principal = principal
304
self.host = host
305
self.operation = operation
306
self.permission_type = permission_type
307
self.resource_pattern_filter = resource_pattern_filter
308
309
class ResourcePattern:
310
def __init__(self, resource_type, resource_name, pattern_type):
311
"""
312
Resource pattern specification.
313
314
Parameters:
315
- resource_type: ResourceType, type of resource
316
- resource_name: str, resource name pattern
317
- pattern_type: ACLResourcePatternType, pattern matching type
318
"""
319
self.resource_type = resource_type
320
self.resource_name = resource_name
321
self.pattern_type = pattern_type
322
323
class ResourcePatternFilter:
324
def __init__(self, resource_type=None, resource_name=None, pattern_type=None):
325
"""Resource pattern filter (allows ANY values)."""
326
self.resource_type = resource_type
327
self.resource_name = resource_name
328
self.pattern_type = pattern_type
329
```
330
331
### ACL Enumerations
332
333
```python { .api }
334
class ACLOperation:
335
ANY = -1
336
ALL = 0
337
READ = 1
338
WRITE = 2
339
CREATE = 3
340
DELETE = 4
341
ALTER = 5
342
DESCRIBE = 6
343
CLUSTER_ACTION = 7
344
DESCRIBE_CONFIGS = 8
345
ALTER_CONFIGS = 9
346
IDEMPOTENT_WRITE = 10
347
348
class ResourceType:
349
UNKNOWN = 0
350
ANY = 1
351
CLUSTER = 2
352
DELEGATION_TOKEN = 3
353
GROUP = 4
354
TOPIC = 5
355
TRANSACTIONAL_ID = 6
356
357
class ACLPermissionType:
358
ANY = 0
359
DENY = 1
360
ALLOW = 2
361
362
class ACLResourcePatternType:
363
ANY = 0
364
MATCH = 1
365
LITERAL = 2
366
PREFIXED = 3
367
```
368
369
### Consumer Group Management
370
371
Classes for consumer group administration.
372
373
```python { .api }
374
class GroupDescription:
375
group_id: str # Group ID
376
is_simple_consumer_group: bool # Simple consumer group flag
377
members: List[MemberDescription] # Group members
378
partition_assignor: str # Partition assignment strategy
379
state: str # Group state
380
coordinator: Node # Group coordinator
381
authorized_operations: List[int] # Authorized operations
382
383
class MemberDescription:
384
member_id: str # Member ID
385
client_id: str # Client ID
386
host: str # Client host
387
assignment: MemberAssignment # Partition assignment
388
389
class MemberAssignment:
390
topic_partitions: Set[TopicPartition] # Assigned partitions
391
```
392
393
## Usage Examples
394
395
### Topic Management
396
397
```python
398
from kafka import KafkaAdminClient
399
from kafka.admin import NewTopic, NewPartitions
400
from kafka.errors import TopicAlreadyExistsError, KafkaError
401
402
# Create admin client
403
admin = KafkaAdminClient(
404
bootstrap_servers=['localhost:9092'],
405
client_id='admin-client'
406
)
407
408
try:
409
# Create topics
410
topics = [
411
NewTopic(name='events', num_partitions=3, replication_factor=1),
412
NewTopic(name='logs', num_partitions=6, replication_factor=1,
413
topic_configs={'retention.ms': '86400000'}) # 1 day retention
414
]
415
416
create_result = admin.create_topics(topics, timeout_ms=30000)
417
418
# Wait for results
419
for topic, future in create_result.items():
420
try:
421
future.result() # Block until completion
422
print(f"Topic '{topic}' created successfully")
423
except TopicAlreadyExistsError:
424
print(f"Topic '{topic}' already exists")
425
except KafkaError as e:
426
print(f"Failed to create topic '{topic}': {e}")
427
428
# List topics
429
topics = admin.list_topics(timeout_ms=10000)
430
print(f"Available topics: {list(topics)}")
431
432
# Add partitions to existing topic
433
partition_updates = {
434
'events': NewPartitions(total_count=5) # Increase from 3 to 5 partitions
435
}
436
437
partition_result = admin.create_partitions(partition_updates, timeout_ms=30000)
438
for topic, future in partition_result.items():
439
try:
440
future.result()
441
print(f"Added partitions to topic '{topic}'")
442
except KafkaError as e:
443
print(f"Failed to add partitions to '{topic}': {e}")
444
445
finally:
446
admin.close()
447
```
448
449
### Topic Description and Metadata
450
451
```python
452
from kafka import KafkaAdminClient
453
454
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
455
456
try:
457
# Get detailed topic information
458
topic_descriptions = admin.describe_topics(['events', 'logs'], timeout_ms=10000)
459
460
for topic_name, description in topic_descriptions.items():
461
print(f"\nTopic: {topic_name}")
462
print(f"Internal: {description.is_internal}")
463
print(f"Partitions: {len(description.partitions)}")
464
465
for partition in description.partitions:
466
print(f" Partition {partition.partition}:")
467
print(f" Leader: {partition.leader}")
468
print(f" Replicas: {partition.replicas}")
469
print(f" ISR: {partition.isr}")
470
471
finally:
472
admin.close()
473
```
474
475
### Configuration Management
476
477
```python
478
from kafka import KafkaAdminClient
479
from kafka.admin import ConfigResource, ConfigResourceType
480
481
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
482
483
try:
484
# Describe topic configurations
485
topic_resource = ConfigResource(ConfigResourceType.TOPIC, 'events')
486
broker_resource = ConfigResource(ConfigResourceType.BROKER, '0')
487
488
config_results = admin.describe_configs([topic_resource, broker_resource],
489
timeout_ms=10000)
490
491
for resource, result in config_results.items():
492
print(f"\nConfigurations for {resource.resource_type} '{resource.name}':")
493
for name, entry in result.configs.items():
494
if not entry.is_default: # Only show non-default configs
495
print(f" {name} = {entry.value}")
496
497
# Alter topic configuration
498
config_updates = {
499
topic_resource: {
500
'retention.ms': '172800000', # 2 days
501
'cleanup.policy': 'delete'
502
}
503
}
504
505
alter_result = admin.alter_configs(config_updates, timeout_ms=30000)
506
for resource, future in alter_result.items():
507
try:
508
future.result()
509
print(f"Configuration updated for {resource.name}")
510
except KafkaError as e:
511
print(f"Failed to update configuration: {e}")
512
513
finally:
514
admin.close()
515
```
516
517
### Access Control Lists (ACL)
518
519
```python
520
from kafka import KafkaAdminClient
521
from kafka.admin import (ACL, ACLFilter, ResourcePattern, ResourcePatternFilter,
522
ACLOperation, ACLPermissionType, ResourceType,
523
ACLResourcePatternType)
524
525
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
526
527
try:
528
# Create ACLs
529
acls = [
530
ACL(
531
principal='User:alice',
532
host='*',
533
operation=ACLOperation.READ,
534
permission_type=ACLPermissionType.ALLOW,
535
resource_pattern=ResourcePattern(
536
resource_type=ResourceType.TOPIC,
537
resource_name='events',
538
pattern_type=ACLResourcePatternType.LITERAL
539
)
540
),
541
ACL(
542
principal='User:bob',
543
host='192.168.1.*',
544
operation=ACLOperation.WRITE,
545
permission_type=ACLPermissionType.ALLOW,
546
resource_pattern=ResourcePattern(
547
resource_type=ResourceType.TOPIC,
548
resource_name='logs-*',
549
pattern_type=ACLResourcePatternType.PREFIXED
550
)
551
)
552
]
553
554
create_result = admin.create_acls(acls, timeout_ms=30000)
555
for acl, future in create_result.items():
556
try:
557
future.result()
558
print(f"ACL created for {acl.principal}")
559
except KafkaError as e:
560
print(f"Failed to create ACL: {e}")
561
562
# List ACLs
563
acl_filter = ACLFilter(
564
resource_pattern_filter=ResourcePatternFilter(
565
resource_type=ResourceType.TOPIC
566
)
567
)
568
569
acl_bindings = admin.describe_acls(acl_filter, timeout_ms=10000)
570
print(f"\nFound {len(acl_bindings)} ACLs:")
571
for binding in acl_bindings:
572
print(f" {binding.principal} {binding.permission_type} "
573
f"{binding.operation} on {binding.pattern.resource_name}")
574
575
finally:
576
admin.close()
577
```
578
579
### Consumer Group Management
580
581
```python
582
from kafka import KafkaAdminClient
583
584
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
585
586
try:
587
# List consumer groups
588
groups = admin.list_consumer_groups(timeout_ms=10000)
589
print(f"Found {len(groups)} consumer groups:")
590
for group in groups:
591
print(f" Group: {group.group}, State: {group.state}")
592
593
# Describe specific consumer groups
594
group_ids = ['my-consumer-group', 'batch-processor']
595
descriptions = admin.describe_consumer_groups(group_ids, timeout_ms=10000)
596
597
for group_id, description in descriptions.items():
598
print(f"\nGroup: {group_id}")
599
print(f"State: {description.state}")
600
print(f"Coordinator: {description.coordinator}")
601
print(f"Assignment Strategy: {description.partition_assignor}")
602
print(f"Members: {len(description.members)}")
603
604
for member in description.members:
605
print(f" Member: {member.member_id}")
606
print(f" Client: {member.client_id}")
607
print(f" Host: {member.host}")
608
print(f" Partitions: {len(member.assignment.topic_partitions)}")
609
610
# Delete inactive consumer group
611
delete_result = admin.delete_consumer_groups(['inactive-group'], timeout_ms=30000)
612
for group_id, future in delete_result.items():
613
try:
614
future.result()
615
print(f"Consumer group '{group_id}' deleted")
616
except KafkaError as e:
617
print(f"Failed to delete group '{group_id}': {e}")
618
619
finally:
620
admin.close()
621
```
622
623
### Cluster Information
624
625
```python
626
from kafka import KafkaAdminClient
627
from kafka.client_async import KafkaClient
628
629
# Using admin client for high-level operations
630
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
631
632
# Using low-level client for detailed cluster info
633
client = KafkaClient(bootstrap_servers=['localhost:9092'])
634
635
try:
636
# Wait for client to connect and load metadata
637
client.poll(timeout_ms=5000)
638
639
# Get cluster metadata
640
cluster = client.cluster
641
642
print("Cluster Information:")
643
print(f"Cluster ID: {cluster.cluster_id}")
644
print(f"Controller: {cluster.controller}")
645
646
print(f"\nBrokers ({len(cluster.brokers())}):")
647
for broker in cluster.brokers():
648
print(f" Broker {broker.nodeId}: {broker.host}:{broker.port}")
649
if broker.rack:
650
print(f" Rack: {broker.rack}")
651
652
print(f"\nTopics ({len(cluster.topics())}):")
653
for topic in sorted(cluster.topics()):
654
partitions = cluster.partitions_for_topic(topic)
655
print(f" {topic}: {len(partitions)} partitions")
656
657
for partition_id in sorted(partitions):
658
partition = cluster.leader_for_partition(TopicPartition(topic, partition_id))
659
print(f" Partition {partition_id}: Leader {partition}")
660
661
finally:
662
admin.close()
663
client.close()
664
```
665
666
### Batch Operations
667
668
```python
669
from kafka import KafkaAdminClient
670
from kafka.admin import NewTopic
671
import concurrent.futures
672
673
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
674
675
try:
676
# Create multiple topics concurrently
677
topics = [
678
NewTopic(f'partition-{i}', num_partitions=i+1, replication_factor=1)
679
for i in range(10)
680
]
681
682
create_result = admin.create_topics(topics, timeout_ms=60000)
683
684
# Process results as they complete
685
with concurrent.futures.ThreadPoolExecutor() as executor:
686
# Submit all futures
687
future_to_topic = {
688
executor.submit(future.result): topic_name
689
for topic_name, future in create_result.items()
690
}
691
692
# Process completed futures
693
for future in concurrent.futures.as_completed(future_to_topic, timeout=60):
694
topic_name = future_to_topic[future]
695
try:
696
future.result()
697
print(f"✓ Topic '{topic_name}' created")
698
except Exception as e:
699
print(f"✗ Topic '{topic_name}' failed: {e}")
700
701
finally:
702
admin.close()
703
```