0
# Admin Client
1
2
The AdminClient provides comprehensive administrative operations for managing Kafka clusters including topics, partitions, configurations, ACLs, consumer groups, and SCRAM credentials. All operations are asynchronous and return futures for concurrent execution.
3
4
## Capabilities
5
6
### AdminClient
7
8
Main administrative client for Kafka cluster management.
9
10
```python { .api }
11
class AdminClient:
12
def __init__(self, conf):
13
"""
14
Create AdminClient instance.
15
16
Args:
17
conf (dict): Configuration properties for the admin client
18
"""
19
20
def create_topics(self, new_topics, **kwargs):
21
"""
22
Create topics.
23
24
Args:
25
new_topics (list): List of NewTopic objects
26
**kwargs: Additional options (validate_only, request_timeout, operation_timeout)
27
28
Returns:
29
dict: Future objects keyed by topic name
30
"""
31
32
def delete_topics(self, topics, **kwargs):
33
"""
34
Delete topics.
35
36
Args:
37
topics (list): List of topic names to delete
38
**kwargs: Additional options (request_timeout, operation_timeout)
39
40
Returns:
41
dict: Future objects keyed by topic name
42
"""
43
44
def list_topics(self, topic=None, timeout=-1):
45
"""
46
Get metadata for topics.
47
48
Args:
49
topic (str, optional): Specific topic name
50
timeout (float): Request timeout in seconds
51
52
Returns:
53
ClusterMetadata: Cluster and topic metadata
54
"""
55
56
def describe_topics(self, topic_names, **kwargs):
57
"""
58
Describe topics.
59
60
Args:
61
topic_names (list): List of topic names to describe
62
**kwargs: Additional options (request_timeout)
63
64
Returns:
65
dict: Future objects keyed by topic name
66
"""
67
68
def create_partitions(self, fs, **kwargs):
69
"""
70
Create additional partitions for topics.
71
72
Args:
73
fs (list): List of NewPartitions objects
74
**kwargs: Additional options (validate_only, request_timeout, operation_timeout)
75
76
Returns:
77
dict: Future objects keyed by topic name
78
"""
79
80
def describe_configs(self, resources, **kwargs):
81
"""
82
Describe configuration for resources.
83
84
Args:
85
resources (list): List of ConfigResource objects
86
**kwargs: Additional options (request_timeout)
87
88
Returns:
89
dict: Future objects keyed by ConfigResource
90
"""
91
92
def alter_configs(self, resources, **kwargs):
93
"""
94
Alter configuration for resources.
95
96
Args:
97
resources (dict): Dict of ConfigResource to list of ConfigEntry
98
**kwargs: Additional options (validate_only, request_timeout)
99
100
Returns:
101
dict: Future objects keyed by ConfigResource
102
"""
103
104
def incremental_alter_configs(self, resources, **kwargs):
105
"""
106
Incrementally alter configuration for resources.
107
108
Args:
109
resources (dict): Dict of ConfigResource to list of ConfigEntry with AlterConfigOpType
110
**kwargs: Additional options (validate_only, request_timeout)
111
112
Returns:
113
dict: Future objects keyed by ConfigResource
114
"""
115
116
def create_acls(self, acl_bindings, **kwargs):
117
"""
118
Create ACL bindings.
119
120
Args:
121
acl_bindings (list): List of AclBinding objects
122
**kwargs: Additional options (request_timeout)
123
124
Returns:
125
dict: Future objects keyed by AclBinding
126
"""
127
128
def describe_acls(self, acl_binding_filter, **kwargs):
129
"""
130
Describe ACL bindings.
131
132
Args:
133
acl_binding_filter (AclBindingFilter): Filter for ACL bindings
134
**kwargs: Additional options (request_timeout)
135
136
Returns:
137
concurrent.futures.Future: Future with AclBinding results
138
"""
139
140
def delete_acls(self, acl_binding_filters, **kwargs):
141
"""
142
Delete ACL bindings.
143
144
Args:
145
acl_binding_filters (list): List of AclBindingFilter objects
146
**kwargs: Additional options (request_timeout)
147
148
Returns:
149
dict: Future objects keyed by AclBindingFilter
150
"""
151
152
def list_consumer_groups(self, **kwargs):
153
"""
154
List consumer groups.
155
156
Args:
157
**kwargs: Additional options (request_timeout, states)
158
159
Returns:
160
concurrent.futures.Future: Future with ListConsumerGroupsResult
161
"""
162
163
def describe_consumer_groups(self, group_ids, **kwargs):
164
"""
165
Describe consumer groups.
166
167
Args:
168
group_ids (list): List of consumer group IDs
169
**kwargs: Additional options (request_timeout, include_authorized_operations)
170
171
Returns:
172
dict: Future objects keyed by group ID
173
"""
174
175
def delete_consumer_groups(self, group_ids, **kwargs):
176
"""
177
Delete consumer groups.
178
179
Args:
180
group_ids (list): List of consumer group IDs to delete
181
**kwargs: Additional options (request_timeout)
182
183
Returns:
184
dict: Future objects keyed by group ID
185
"""
186
187
def list_consumer_group_offsets(self, request, **kwargs):
188
"""
189
List consumer group offsets.
190
191
Args:
192
request (ConsumerGroupTopicPartitions or list): Group and partitions to query
193
**kwargs: Additional options (request_timeout, require_stable)
194
195
Returns:
196
dict: Future objects keyed by ConsumerGroupTopicPartitions
197
"""
198
199
def alter_consumer_group_offsets(self, group_topic_partitions, **kwargs):
200
"""
201
Alter consumer group offsets.
202
203
Args:
204
group_topic_partitions (list): List of ConsumerGroupTopicPartitions
205
**kwargs: Additional options (request_timeout)
206
207
Returns:
208
dict: Future objects keyed by ConsumerGroupTopicPartitions
209
"""
210
211
def describe_user_scram_credentials(self, users=None, **kwargs):
212
"""
213
Describe SCRAM credentials for users.
214
215
Args:
216
users (list, optional): List of usernames (None for all)
217
**kwargs: Additional options (request_timeout)
218
219
Returns:
220
dict: Future objects keyed by username
221
"""
222
223
def alter_user_scram_credentials(self, alterations, **kwargs):
224
"""
225
Alter SCRAM credentials for users.
226
227
Args:
228
alterations (list): List of UserScramCredentialAlteration objects
229
**kwargs: Additional options (request_timeout)
230
231
Returns:
232
dict: Future objects keyed by username
233
"""
234
235
def describe_cluster(self, **kwargs):
236
"""
237
Describe cluster information.
238
239
Args:
240
**kwargs: Additional options (request_timeout, include_authorized_operations)
241
242
Returns:
243
concurrent.futures.Future: Future with DescribeClusterResult
244
"""
245
246
def list_offsets(self, topic_partition_offsets, **kwargs):
247
"""
248
List offsets for topic partitions.
249
250
Args:
251
topic_partition_offsets (dict): Dict of TopicPartition to OffsetSpec
252
**kwargs: Additional options (request_timeout, isolation_level)
253
254
Returns:
255
dict: Future objects keyed by TopicPartition
256
"""
257
258
def delete_records(self, topic_partition_offsets, **kwargs):
259
"""
260
Delete records before specified offsets.
261
262
Args:
263
topic_partition_offsets (dict): Dict of TopicPartition to offset
264
**kwargs: Additional options (request_timeout)
265
266
Returns:
267
dict: Future objects keyed by TopicPartition
268
"""
269
270
def elect_leaders(self, election_type, partitions=None, **kwargs):
271
"""
272
Elect leaders for topic partitions.
273
274
Args:
275
election_type (ElectionType): Type of election (PREFERRED or UNCLEAN)
276
partitions (list, optional): List of TopicPartition objects (None for all partitions)
277
**kwargs: Additional options (request_timeout)
278
279
Returns:
280
concurrent.futures.Future: Future with election results
281
"""
282
283
def set_sasl_credentials(self, username, password):
284
"""
285
Set SASL credentials for authentication.
286
287
Args:
288
username (str): SASL username
289
password (str): SASL password
290
"""
291
```
292
293
### Topic Management Classes
294
295
#### NewTopic
296
297
Specification for creating new topics.
298
299
```python { .api }
300
class NewTopic:
301
def __init__(self, topic, num_partitions=None, replication_factor=None, replica_assignment=None, config=None):
302
"""
303
Create NewTopic specification.
304
305
Args:
306
topic (str): Topic name
307
num_partitions (int, optional): Number of partitions
308
replication_factor (int, optional): Replication factor
309
replica_assignment (dict, optional): Manual replica assignment
310
config (dict, optional): Topic configuration
311
"""
312
313
@property
314
def topic(self):
315
"""Topic name."""
316
317
@property
318
def num_partitions(self):
319
"""Number of partitions."""
320
321
@property
322
def replication_factor(self):
323
"""Replication factor."""
324
325
@property
326
def replica_assignment(self):
327
"""Replica assignment."""
328
329
@property
330
def config(self):
331
"""Topic configuration."""
332
```
333
334
#### NewPartitions
335
336
Specification for adding partitions to existing topics.
337
338
```python { .api }
339
class NewPartitions:
340
def __init__(self, topic, new_total_count, replica_assignment=None):
341
"""
342
Create NewPartitions specification.
343
344
Args:
345
topic (str): Topic name
346
new_total_count (int): New total partition count
347
replica_assignment (list, optional): Replica assignment for new partitions
348
"""
349
350
@property
351
def topic(self):
352
"""Topic name."""
353
354
@property
355
def new_total_count(self):
356
"""New total partition count."""
357
358
@property
359
def replica_assignment(self):
360
"""Replica assignment."""
361
```
362
363
### Configuration Management Classes
364
365
#### ConfigResource
366
367
Represents a configuration resource.
368
369
```python { .api }
370
class ConfigResource:
371
def __init__(self, restype, name, incremental_configs=None):
372
"""
373
Create ConfigResource.
374
375
Args:
376
restype (int): Resource type (RESOURCE_TOPIC, RESOURCE_BROKER, etc.)
377
name (str): Resource name
378
incremental_configs (list, optional): Incremental configuration entries
379
"""
380
381
@property
382
def restype(self):
383
"""Resource type."""
384
385
@property
386
def name(self):
387
"""Resource name."""
388
389
@property
390
def incremental_configs(self):
391
"""Incremental configuration entries."""
392
393
def __hash__(self):
394
"""Hash for use in dicts."""
395
396
def __eq__(self, other):
397
"""Equality comparison."""
398
```
399
400
#### ConfigEntry
401
402
Represents a configuration entry.
403
404
```python { .api }
405
class ConfigEntry:
406
def __init__(self, name, value, incremental_operation=None):
407
"""
408
Create ConfigEntry.
409
410
Args:
411
name (str): Configuration name
412
value (str): Configuration value
413
incremental_operation (AlterConfigOpType, optional): Operation type for incremental updates
414
"""
415
416
@property
417
def name(self):
418
"""Configuration name."""
419
420
@property
421
def value(self):
422
"""Configuration value."""
423
424
@property
425
def incremental_operation(self):
426
"""Incremental operation type."""
427
428
@property
429
def source(self):
430
"""Configuration source."""
431
432
@property
433
def is_default(self):
434
"""Whether this is a default configuration."""
435
436
@property
437
def is_read_only(self):
438
"""Whether this configuration is read-only."""
439
440
@property
441
def is_sensitive(self):
442
"""Whether this configuration is sensitive."""
443
444
@property
445
def synonyms(self):
446
"""Configuration synonyms."""
447
```
448
449
### ACL Management Classes
450
451
#### AclBinding
452
453
Represents an ACL binding.
454
455
```python { .api }
456
class AclBinding:
457
def __init__(self, restype, name, resource_pattern_type, principal, host, operation, permission_type):
458
"""
459
Create AclBinding.
460
461
Args:
462
restype (ResourceType): Resource type
463
name (str): Resource name
464
resource_pattern_type (ResourcePatternType): Pattern type
465
principal (str): Principal (user/service)
466
host (str): Host pattern
467
operation (AclOperation): ACL operation
468
permission_type (AclPermissionType): Permission type
469
"""
470
471
@property
472
def restype(self):
473
"""Resource type."""
474
475
@property
476
def name(self):
477
"""Resource name."""
478
479
@property
480
def resource_pattern_type(self):
481
"""Resource pattern type."""
482
483
@property
484
def principal(self):
485
"""Principal."""
486
487
@property
488
def host(self):
489
"""Host pattern."""
490
491
@property
492
def operation(self):
493
"""ACL operation."""
494
495
@property
496
def permission_type(self):
497
"""Permission type."""
498
```
499
500
#### AclBindingFilter
501
502
Filter for ACL bindings.
503
504
```python { .api }
505
class AclBindingFilter:
506
def __init__(self, restype, name, resource_pattern_type, principal, host, operation, permission_type):
507
"""
508
Create AclBindingFilter.
509
510
Args:
511
restype (ResourceType): Resource type (can be ANY)
512
name (str): Resource name (can be None for any)
513
resource_pattern_type (ResourcePatternType): Pattern type (can be ANY)
514
principal (str): Principal (can be None for any)
515
host (str): Host pattern (can be None for any)
516
operation (AclOperation): ACL operation (can be ANY)
517
permission_type (AclPermissionType): Permission type (can be ANY)
518
"""
519
```
520
521
### Consumer Group Management Classes
522
523
#### ConsumerGroupListing
524
525
Information about a consumer group.
526
527
```python { .api }
528
class ConsumerGroupListing:
529
@property
530
def group_id(self):
531
"""Consumer group ID."""
532
533
@property
534
def is_simple_consumer_group(self):
535
"""Whether this is a simple consumer group."""
536
537
@property
538
def state(self):
539
"""Consumer group state."""
540
541
@property
542
def type(self):
543
"""Consumer group type."""
544
```
545
546
#### ConsumerGroupDescription
547
548
Detailed description of a consumer group.
549
550
```python { .api }
551
class ConsumerGroupDescription:
552
@property
553
def group_id(self):
554
"""Consumer group ID."""
555
556
@property
557
def is_simple_consumer_group(self):
558
"""Whether this is a simple consumer group."""
559
560
@property
561
def members(self):
562
"""List of group members."""
563
564
@property
565
def partition_assignor(self):
566
"""Partition assignor strategy."""
567
568
@property
569
def state(self):
570
"""Consumer group state."""
571
572
@property
573
def coordinator(self):
574
"""Group coordinator node."""
575
576
@property
577
def authorized_operations(self):
578
"""Authorized operations for this group."""
579
```
580
581
#### MemberDescription
582
583
Description of a consumer group member.
584
585
```python { .api }
586
class MemberDescription:
587
@property
588
def member_id(self):
589
"""Member ID."""
590
591
@property
592
def group_instance_id(self):
593
"""Group instance ID."""
594
595
@property
596
def client_id(self):
597
"""Client ID."""
598
599
@property
600
def host(self):
601
"""Member host."""
602
603
@property
604
def assignment(self):
605
"""Member assignment."""
606
```
607
608
### SCRAM Credential Management
609
610
#### UserScramCredentialAlteration
611
612
Base class for SCRAM credential alterations.
613
614
```python { .api }
615
class UserScramCredentialAlteration:
616
def __init__(self, user):
617
"""
618
Base class for SCRAM credential alterations.
619
620
Args:
621
user (str): Username
622
"""
623
624
@property
625
def user(self):
626
"""Username."""
627
```
628
629
#### UserScramCredentialUpsertion
630
631
SCRAM credential creation or update.
632
633
```python { .api }
634
class UserScramCredentialUpsertion(UserScramCredentialAlteration):
635
def __init__(self, user, scram_credential_info):
636
"""
637
Create or update SCRAM credentials.
638
639
Args:
640
user (str): Username
641
scram_credential_info (ScramCredentialInfo): Credential information
642
"""
643
644
@property
645
def scram_credential_info(self):
646
"""SCRAM credential information."""
647
```
648
649
#### UserScramCredentialDeletion
650
651
SCRAM credential deletion.
652
653
```python { .api }
654
class UserScramCredentialDeletion(UserScramCredentialAlteration):
655
def __init__(self, user, mechanism):
656
"""
657
Delete SCRAM credentials.
658
659
Args:
660
user (str): Username
661
mechanism (ScramMechanism): SCRAM mechanism to delete
662
"""
663
664
@property
665
def mechanism(self):
666
"""SCRAM mechanism."""
667
```
668
669
### Enumeration Classes
670
671
```python { .api }
672
class ResourceType:
673
UNKNOWN = 0
674
ANY = 1
675
TOPIC = 2
676
GROUP = 3
677
CLUSTER = 4
678
TRANSACTIONAL_ID = 5
679
DELEGATION_TOKEN = 6
680
USER = 7
681
682
class ResourcePatternType:
683
UNKNOWN = 0
684
ANY = 1
685
MATCH = 2
686
LITERAL = 3
687
PREFIXED = 4
688
689
class AclOperation:
690
UNKNOWN = 0
691
ANY = 1
692
ALL = 2
693
READ = 3
694
WRITE = 4
695
CREATE = 5
696
DELETE = 6
697
ALTER = 7
698
DESCRIBE = 8
699
CLUSTER_ACTION = 9
700
DESCRIBE_CONFIGS = 10
701
ALTER_CONFIGS = 11
702
IDEMPOTENT_WRITE = 12
703
704
class AclPermissionType:
705
UNKNOWN = 0
706
ANY = 1
707
DENY = 2
708
ALLOW = 3
709
710
class ConfigSource:
711
UNKNOWN_CONFIG = 0
712
DYNAMIC_TOPIC_CONFIG = 1
713
DYNAMIC_BROKER_CONFIG = 2
714
DYNAMIC_DEFAULT_BROKER_CONFIG = 3
715
STATIC_BROKER_CONFIG = 4
716
DEFAULT_CONFIG = 5
717
718
class AlterConfigOpType:
719
SET = 0
720
DELETE = 1
721
APPEND = 2
722
SUBTRACT = 3
723
724
class ScramMechanism:
725
SCRAM_SHA_256 = 0
726
SCRAM_SHA_512 = 1
727
728
class ElectionType:
729
PREFERRED = 0
730
UNCLEAN = 1
731
```
732
733
### Usage Examples
734
735
#### Creating Topics
736
737
```python
738
from confluent_kafka.admin import AdminClient, NewTopic
739
740
admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})
741
742
# Create topics
743
new_topics = [
744
NewTopic('my-topic-1', num_partitions=3, replication_factor=1),
745
NewTopic('my-topic-2', num_partitions=6, replication_factor=1, config={'cleanup.policy': 'compact'})
746
]
747
748
fs = admin_client.create_topics(new_topics, request_timeout=30)
749
750
# Wait for results
751
for topic, f in fs.items():
752
try:
753
f.result() # The result itself is None
754
print(f"Topic {topic} created")
755
except Exception as e:
756
print(f"Failed to create topic {topic}: {e}")
757
```
758
759
#### Managing Consumer Groups
760
761
```python
762
from confluent_kafka.admin import AdminClient
763
764
admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})
765
766
# List consumer groups
767
fs = admin_client.list_consumer_groups(request_timeout=10)
768
try:
769
result = fs.result()
770
for group_listing in result.valid:
771
print(f"Group: {group_listing.group_id}, State: {group_listing.state}")
772
except Exception as e:
773
print(f"Failed to list consumer groups: {e}")
774
775
# Describe specific consumer groups
776
group_ids = ['my-group-1', 'my-group-2']
777
fs = admin_client.describe_consumer_groups(group_ids, request_timeout=10)
778
779
for group_id, f in fs.items():
780
try:
781
group_desc = f.result()
782
print(f"Group {group_id}: {len(group_desc.members)} members")
783
for member in group_desc.members:
784
print(f" Member: {member.member_id} on {member.host}")
785
except Exception as e:
786
print(f"Failed to describe group {group_id}: {e}")
787
```
788
789
#### Managing ACLs
790
791
```python
792
from confluent_kafka.admin import AdminClient, AclBinding, AclBindingFilter
793
from confluent_kafka.admin import ResourceType, ResourcePatternType, AclOperation, AclPermissionType
794
795
admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})
796
797
# Create ACL binding
798
acl_binding = AclBinding(
799
restype=ResourceType.TOPIC,
800
name='my-topic',
801
resource_pattern_type=ResourcePatternType.LITERAL,
802
principal='User:alice',
803
host='*',
804
operation=AclOperation.READ,
805
permission_type=AclPermissionType.ALLOW
806
)
807
808
fs = admin_client.create_acls([acl_binding], request_timeout=10)
809
for acl, f in fs.items():
810
try:
811
f.result()
812
print(f"ACL created for {acl.principal}")
813
except Exception as e:
814
print(f"Failed to create ACL: {e}")
815
816
# List ACLs
817
acl_filter = AclBindingFilter(
818
restype=ResourceType.TOPIC,
819
name=None, # All topics
820
resource_pattern_type=ResourcePatternType.ANY,
821
principal=None, # All principals
822
host=None, # All hosts
823
operation=AclOperation.ANY,
824
permission_type=AclPermissionType.ANY
825
)
826
827
fs = admin_client.describe_acls(acl_filter, request_timeout=10)
828
try:
829
acl_bindings = fs.result()
830
for acl in acl_bindings:
831
print(f"ACL: {acl.principal} {acl.permission_type} {acl.operation} on {acl.name}")
832
except Exception as e:
833
print(f"Failed to list ACLs: {e}")
834
```