or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin-client.mdcore-producer-consumer.mderror-handling.mdindex.mdschema-registry.mdserialization.md

admin-client.mddocs/

0

# Admin Client

1

2

The AdminClient provides comprehensive administrative operations for managing Kafka clusters including topics, partitions, configurations, ACLs, consumer groups, and SCRAM credentials. All operations are asynchronous and return futures for concurrent execution.

3

4

## Capabilities

5

6

### AdminClient

7

8

Main administrative client for Kafka cluster management.

9

10

```python { .api }

11

class AdminClient:

12

def __init__(self, conf):

13

"""

14

Create AdminClient instance.

15

16

Args:

17

conf (dict): Configuration properties for the admin client

18

"""

19

20

def create_topics(self, new_topics, **kwargs):

21

"""

22

Create topics.

23

24

Args:

25

new_topics (list): List of NewTopic objects

26

**kwargs: Additional options (validate_only, request_timeout, operation_timeout)

27

28

Returns:

29

dict: Future objects keyed by topic name

30

"""

31

32

def delete_topics(self, topics, **kwargs):

33

"""

34

Delete topics.

35

36

Args:

37

topics (list): List of topic names to delete

38

**kwargs: Additional options (request_timeout, operation_timeout)

39

40

Returns:

41

dict: Future objects keyed by topic name

42

"""

43

44

def list_topics(self, topic=None, timeout=-1):

45

"""

46

Get metadata for topics.

47

48

Args:

49

topic (str, optional): Specific topic name

50

timeout (float): Request timeout in seconds

51

52

Returns:

53

ClusterMetadata: Cluster and topic metadata

54

"""

55

56

def describe_topics(self, topic_names, **kwargs):

57

"""

58

Describe topics.

59

60

Args:

61

topic_names (list): List of topic names to describe

62

**kwargs: Additional options (request_timeout)

63

64

Returns:

65

dict: Future objects keyed by topic name

66

"""

67

68

def create_partitions(self, fs, **kwargs):

69

"""

70

Create additional partitions for topics.

71

72

Args:

73

fs (list): List of NewPartitions objects

74

**kwargs: Additional options (validate_only, request_timeout, operation_timeout)

75

76

Returns:

77

dict: Future objects keyed by topic name

78

"""

79

80

def describe_configs(self, resources, **kwargs):

81

"""

82

Describe configuration for resources.

83

84

Args:

85

resources (list): List of ConfigResource objects

86

**kwargs: Additional options (request_timeout)

87

88

Returns:

89

dict: Future objects keyed by ConfigResource

90

"""

91

92

def alter_configs(self, resources, **kwargs):

93

"""

94

Alter configuration for resources.

95

96

Args:

97

resources (dict): Dict of ConfigResource to list of ConfigEntry

98

**kwargs: Additional options (validate_only, request_timeout)

99

100

Returns:

101

dict: Future objects keyed by ConfigResource

102

"""

103

104

def incremental_alter_configs(self, resources, **kwargs):

105

"""

106

Incrementally alter configuration for resources.

107

108

Args:

109

resources (dict): Dict of ConfigResource to list of ConfigEntry with AlterConfigOpType

110

**kwargs: Additional options (validate_only, request_timeout)

111

112

Returns:

113

dict: Future objects keyed by ConfigResource

114

"""

115

116

def create_acls(self, acl_bindings, **kwargs):

117

"""

118

Create ACL bindings.

119

120

Args:

121

acl_bindings (list): List of AclBinding objects

122

**kwargs: Additional options (request_timeout)

123

124

Returns:

125

dict: Future objects keyed by AclBinding

126

"""

127

128

def describe_acls(self, acl_binding_filter, **kwargs):

129

"""

130

Describe ACL bindings.

131

132

Args:

133

acl_binding_filter (AclBindingFilter): Filter for ACL bindings

134

**kwargs: Additional options (request_timeout)

135

136

Returns:

137

concurrent.futures.Future: Future with AclBinding results

138

"""

139

140

def delete_acls(self, acl_binding_filters, **kwargs):

141

"""

142

Delete ACL bindings.

143

144

Args:

145

acl_binding_filters (list): List of AclBindingFilter objects

146

**kwargs: Additional options (request_timeout)

147

148

Returns:

149

dict: Future objects keyed by AclBindingFilter

150

"""

151

152

def list_consumer_groups(self, **kwargs):

153

"""

154

List consumer groups.

155

156

Args:

157

**kwargs: Additional options (request_timeout, states)

158

159

Returns:

160

concurrent.futures.Future: Future with ListConsumerGroupsResult

161

"""

162

163

def describe_consumer_groups(self, group_ids, **kwargs):

164

"""

165

Describe consumer groups.

166

167

Args:

168

group_ids (list): List of consumer group IDs

169

**kwargs: Additional options (request_timeout, include_authorized_operations)

170

171

Returns:

172

dict: Future objects keyed by group ID

173

"""

174

175

def delete_consumer_groups(self, group_ids, **kwargs):

176

"""

177

Delete consumer groups.

178

179

Args:

180

group_ids (list): List of consumer group IDs to delete

181

**kwargs: Additional options (request_timeout)

182

183

Returns:

184

dict: Future objects keyed by group ID

185

"""

186

187

def list_consumer_group_offsets(self, request, **kwargs):

188

"""

189

List consumer group offsets.

190

191

Args:

192

request (ConsumerGroupTopicPartitions or list): Group and partitions to query

193

**kwargs: Additional options (request_timeout, require_stable)

194

195

Returns:

196

dict: Future objects keyed by ConsumerGroupTopicPartitions

197

"""

198

199

def alter_consumer_group_offsets(self, group_topic_partitions, **kwargs):

200

"""

201

Alter consumer group offsets.

202

203

Args:

204

group_topic_partitions (list): List of ConsumerGroupTopicPartitions

205

**kwargs: Additional options (request_timeout)

206

207

Returns:

208

dict: Future objects keyed by ConsumerGroupTopicPartitions

209

"""

210

211

def describe_user_scram_credentials(self, users=None, **kwargs):

212

"""

213

Describe SCRAM credentials for users.

214

215

Args:

216

users (list, optional): List of usernames (None for all)

217

**kwargs: Additional options (request_timeout)

218

219

Returns:

220

dict: Future objects keyed by username

221

"""

222

223

def alter_user_scram_credentials(self, alterations, **kwargs):

224

"""

225

Alter SCRAM credentials for users.

226

227

Args:

228

alterations (list): List of UserScramCredentialAlteration objects

229

**kwargs: Additional options (request_timeout)

230

231

Returns:

232

dict: Future objects keyed by username

233

"""

234

235

def describe_cluster(self, **kwargs):

236

"""

237

Describe cluster information.

238

239

Args:

240

**kwargs: Additional options (request_timeout, include_authorized_operations)

241

242

Returns:

243

concurrent.futures.Future: Future with DescribeClusterResult

244

"""

245

246

def list_offsets(self, topic_partition_offsets, **kwargs):

247

"""

248

List offsets for topic partitions.

249

250

Args:

251

topic_partition_offsets (dict): Dict of TopicPartition to OffsetSpec

252

**kwargs: Additional options (request_timeout, isolation_level)

253

254

Returns:

255

dict: Future objects keyed by TopicPartition

256

"""

257

258

def delete_records(self, topic_partition_offsets, **kwargs):

259

"""

260

Delete records before specified offsets.

261

262

Args:

263

topic_partition_offsets (dict): Dict of TopicPartition to offset

264

**kwargs: Additional options (request_timeout)

265

266

Returns:

267

dict: Future objects keyed by TopicPartition

268

"""

269

270

def elect_leaders(self, election_type, partitions=None, **kwargs):

271

"""

272

Elect leaders for topic partitions.

273

274

Args:

275

election_type (ElectionType): Type of election (PREFERRED or UNCLEAN)

276

partitions (list, optional): List of TopicPartition objects (None for all partitions)

277

**kwargs: Additional options (request_timeout)

278

279

Returns:

280

concurrent.futures.Future: Future with election results

281

"""

282

283

def set_sasl_credentials(self, username, password):

284

"""

285

Set SASL credentials for authentication.

286

287

Args:

288

username (str): SASL username

289

password (str): SASL password

290

"""

291

```

292

293

### Topic Management Classes

294

295

#### NewTopic

296

297

Specification for creating new topics.

298

299

```python { .api }

300

class NewTopic:

301

def __init__(self, topic, num_partitions=None, replication_factor=None, replica_assignment=None, config=None):

302

"""

303

Create NewTopic specification.

304

305

Args:

306

topic (str): Topic name

307

num_partitions (int, optional): Number of partitions

308

replication_factor (int, optional): Replication factor

309

replica_assignment (dict, optional): Manual replica assignment

310

config (dict, optional): Topic configuration

311

"""

312

313

@property

314

def topic(self):

315

"""Topic name."""

316

317

@property

318

def num_partitions(self):

319

"""Number of partitions."""

320

321

@property

322

def replication_factor(self):

323

"""Replication factor."""

324

325

@property

326

def replica_assignment(self):

327

"""Replica assignment."""

328

329

@property

330

def config(self):

331

"""Topic configuration."""

332

```

333

334

#### NewPartitions

335

336

Specification for adding partitions to existing topics.

337

338

```python { .api }

339

class NewPartitions:

340

def __init__(self, topic, new_total_count, replica_assignment=None):

341

"""

342

Create NewPartitions specification.

343

344

Args:

345

topic (str): Topic name

346

new_total_count (int): New total partition count

347

replica_assignment (list, optional): Replica assignment for new partitions

348

"""

349

350

@property

351

def topic(self):

352

"""Topic name."""

353

354

@property

355

def new_total_count(self):

356

"""New total partition count."""

357

358

@property

359

def replica_assignment(self):

360

"""Replica assignment."""

361

```

362

363

### Configuration Management Classes

364

365

#### ConfigResource

366

367

Represents a configuration resource.

368

369

```python { .api }

370

class ConfigResource:

371

def __init__(self, restype, name, incremental_configs=None):

372

"""

373

Create ConfigResource.

374

375

Args:

376

restype (int): Resource type (RESOURCE_TOPIC, RESOURCE_BROKER, etc.)

377

name (str): Resource name

378

incremental_configs (list, optional): Incremental configuration entries

379

"""

380

381

@property

382

def restype(self):

383

"""Resource type."""

384

385

@property

386

def name(self):

387

"""Resource name."""

388

389

@property

390

def incremental_configs(self):

391

"""Incremental configuration entries."""

392

393

def __hash__(self):

394

"""Hash for use in dicts."""

395

396

def __eq__(self, other):

397

"""Equality comparison."""

398

```

399

400

#### ConfigEntry

401

402

Represents a configuration entry.

403

404

```python { .api }

405

class ConfigEntry:

406

def __init__(self, name, value, incremental_operation=None):

407

"""

408

Create ConfigEntry.

409

410

Args:

411

name (str): Configuration name

412

value (str): Configuration value

413

incremental_operation (AlterConfigOpType, optional): Operation type for incremental updates

414

"""

415

416

@property

417

def name(self):

418

"""Configuration name."""

419

420

@property

421

def value(self):

422

"""Configuration value."""

423

424

@property

425

def incremental_operation(self):

426

"""Incremental operation type."""

427

428

@property

429

def source(self):

430

"""Configuration source."""

431

432

@property

433

def is_default(self):

434

"""Whether this is a default configuration."""

435

436

@property

437

def is_read_only(self):

438

"""Whether this configuration is read-only."""

439

440

@property

441

def is_sensitive(self):

442

"""Whether this configuration is sensitive."""

443

444

@property

445

def synonyms(self):

446

"""Configuration synonyms."""

447

```

448

449

### ACL Management Classes

450

451

#### AclBinding

452

453

Represents an ACL binding.

454

455

```python { .api }

456

class AclBinding:

457

def __init__(self, restype, name, resource_pattern_type, principal, host, operation, permission_type):

458

"""

459

Create AclBinding.

460

461

Args:

462

restype (ResourceType): Resource type

463

name (str): Resource name

464

resource_pattern_type (ResourcePatternType): Pattern type

465

principal (str): Principal (user/service)

466

host (str): Host pattern

467

operation (AclOperation): ACL operation

468

permission_type (AclPermissionType): Permission type

469

"""

470

471

@property

472

def restype(self):

473

"""Resource type."""

474

475

@property

476

def name(self):

477

"""Resource name."""

478

479

@property

480

def resource_pattern_type(self):

481

"""Resource pattern type."""

482

483

@property

484

def principal(self):

485

"""Principal."""

486

487

@property

488

def host(self):

489

"""Host pattern."""

490

491

@property

492

def operation(self):

493

"""ACL operation."""

494

495

@property

496

def permission_type(self):

497

"""Permission type."""

498

```

499

500

#### AclBindingFilter

501

502

Filter for ACL bindings.

503

504

```python { .api }

505

class AclBindingFilter:

506

def __init__(self, restype, name, resource_pattern_type, principal, host, operation, permission_type):

507

"""

508

Create AclBindingFilter.

509

510

Args:

511

restype (ResourceType): Resource type (can be ANY)

512

name (str): Resource name (can be None for any)

513

resource_pattern_type (ResourcePatternType): Pattern type (can be ANY)

514

principal (str): Principal (can be None for any)

515

host (str): Host pattern (can be None for any)

516

operation (AclOperation): ACL operation (can be ANY)

517

permission_type (AclPermissionType): Permission type (can be ANY)

518

"""

519

```

520

521

### Consumer Group Management Classes

522

523

#### ConsumerGroupListing

524

525

Information about a consumer group.

526

527

```python { .api }

528

class ConsumerGroupListing:

529

@property

530

def group_id(self):

531

"""Consumer group ID."""

532

533

@property

534

def is_simple_consumer_group(self):

535

"""Whether this is a simple consumer group."""

536

537

@property

538

def state(self):

539

"""Consumer group state."""

540

541

@property

542

def type(self):

543

"""Consumer group type."""

544

```

545

546

#### ConsumerGroupDescription

547

548

Detailed description of a consumer group.

549

550

```python { .api }

551

class ConsumerGroupDescription:

552

@property

553

def group_id(self):

554

"""Consumer group ID."""

555

556

@property

557

def is_simple_consumer_group(self):

558

"""Whether this is a simple consumer group."""

559

560

@property

561

def members(self):

562

"""List of group members."""

563

564

@property

565

def partition_assignor(self):

566

"""Partition assignor strategy."""

567

568

@property

569

def state(self):

570

"""Consumer group state."""

571

572

@property

573

def coordinator(self):

574

"""Group coordinator node."""

575

576

@property

577

def authorized_operations(self):

578

"""Authorized operations for this group."""

579

```

580

581

#### MemberDescription

582

583

Description of a consumer group member.

584

585

```python { .api }

586

class MemberDescription:

587

@property

588

def member_id(self):

589

"""Member ID."""

590

591

@property

592

def group_instance_id(self):

593

"""Group instance ID."""

594

595

@property

596

def client_id(self):

597

"""Client ID."""

598

599

@property

600

def host(self):

601

"""Member host."""

602

603

@property

604

def assignment(self):

605

"""Member assignment."""

606

```

607

608

### SCRAM Credential Management

609

610

#### UserScramCredentialAlteration

611

612

Base class for SCRAM credential alterations.

613

614

```python { .api }

615

class UserScramCredentialAlteration:

616

def __init__(self, user):

617

"""

618

Base class for SCRAM credential alterations.

619

620

Args:

621

user (str): Username

622

"""

623

624

@property

625

def user(self):

626

"""Username."""

627

```

628

629

#### UserScramCredentialUpsertion

630

631

SCRAM credential creation or update.

632

633

```python { .api }

634

class UserScramCredentialUpsertion(UserScramCredentialAlteration):

635

def __init__(self, user, scram_credential_info):

636

"""

637

Create or update SCRAM credentials.

638

639

Args:

640

user (str): Username

641

scram_credential_info (ScramCredentialInfo): Credential information

642

"""

643

644

@property

645

def scram_credential_info(self):

646

"""SCRAM credential information."""

647

```

648

649

#### UserScramCredentialDeletion

650

651

SCRAM credential deletion.

652

653

```python { .api }

654

class UserScramCredentialDeletion(UserScramCredentialAlteration):

655

def __init__(self, user, mechanism):

656

"""

657

Delete SCRAM credentials.

658

659

Args:

660

user (str): Username

661

mechanism (ScramMechanism): SCRAM mechanism to delete

662

"""

663

664

@property

665

def mechanism(self):

666

"""SCRAM mechanism."""

667

```

668

669

### Enumeration Classes

670

671

```python { .api }

672

class ResourceType:

673

UNKNOWN = 0

674

ANY = 1

675

TOPIC = 2

676

GROUP = 3

677

CLUSTER = 4

678

TRANSACTIONAL_ID = 5

679

DELEGATION_TOKEN = 6

680

USER = 7

681

682

class ResourcePatternType:

683

UNKNOWN = 0

684

ANY = 1

685

MATCH = 2

686

LITERAL = 3

687

PREFIXED = 4

688

689

class AclOperation:

690

UNKNOWN = 0

691

ANY = 1

692

ALL = 2

693

READ = 3

694

WRITE = 4

695

CREATE = 5

696

DELETE = 6

697

ALTER = 7

698

DESCRIBE = 8

699

CLUSTER_ACTION = 9

700

DESCRIBE_CONFIGS = 10

701

ALTER_CONFIGS = 11

702

IDEMPOTENT_WRITE = 12

703

704

class AclPermissionType:

705

UNKNOWN = 0

706

ANY = 1

707

DENY = 2

708

ALLOW = 3

709

710

class ConfigSource:

711

UNKNOWN_CONFIG = 0

712

DYNAMIC_TOPIC_CONFIG = 1

713

DYNAMIC_BROKER_CONFIG = 2

714

DYNAMIC_DEFAULT_BROKER_CONFIG = 3

715

STATIC_BROKER_CONFIG = 4

716

DEFAULT_CONFIG = 5

717

718

class AlterConfigOpType:

719

SET = 0

720

DELETE = 1

721

APPEND = 2

722

SUBTRACT = 3

723

724

class ScramMechanism:

725

SCRAM_SHA_256 = 0

726

SCRAM_SHA_512 = 1

727

728

class ElectionType:

729

PREFERRED = 0

730

UNCLEAN = 1

731

```

732

733

### Usage Examples

734

735

#### Creating Topics

736

737

```python

738

from confluent_kafka.admin import AdminClient, NewTopic

739

740

admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})

741

742

# Create topics

743

new_topics = [

744

NewTopic('my-topic-1', num_partitions=3, replication_factor=1),

745

NewTopic('my-topic-2', num_partitions=6, replication_factor=1, config={'cleanup.policy': 'compact'})

746

]

747

748

fs = admin_client.create_topics(new_topics, request_timeout=30)

749

750

# Wait for results

751

for topic, f in fs.items():

752

try:

753

f.result() # The result itself is None

754

print(f"Topic {topic} created")

755

except Exception as e:

756

print(f"Failed to create topic {topic}: {e}")

757

```

758

759

#### Managing Consumer Groups

760

761

```python

762

from confluent_kafka.admin import AdminClient

763

764

admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})

765

766

# List consumer groups

767

fs = admin_client.list_consumer_groups(request_timeout=10)

768

try:

769

result = fs.result()

770

for group_listing in result.valid:

771

print(f"Group: {group_listing.group_id}, State: {group_listing.state}")

772

except Exception as e:

773

print(f"Failed to list consumer groups: {e}")

774

775

# Describe specific consumer groups

776

group_ids = ['my-group-1', 'my-group-2']

777

fs = admin_client.describe_consumer_groups(group_ids, request_timeout=10)

778

779

for group_id, f in fs.items():

780

try:

781

group_desc = f.result()

782

print(f"Group {group_id}: {len(group_desc.members)} members")

783

for member in group_desc.members:

784

print(f" Member: {member.member_id} on {member.host}")

785

except Exception as e:

786

print(f"Failed to describe group {group_id}: {e}")

787

```

788

789

#### Managing ACLs

790

791

```python

792

from confluent_kafka.admin import AdminClient, AclBinding, AclBindingFilter

793

from confluent_kafka.admin import ResourceType, ResourcePatternType, AclOperation, AclPermissionType

794

795

admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})

796

797

# Create ACL binding

798

acl_binding = AclBinding(

799

restype=ResourceType.TOPIC,

800

name='my-topic',

801

resource_pattern_type=ResourcePatternType.LITERAL,

802

principal='User:alice',

803

host='*',

804

operation=AclOperation.READ,

805

permission_type=AclPermissionType.ALLOW

806

)

807

808

fs = admin_client.create_acls([acl_binding], request_timeout=10)

809

for acl, f in fs.items():

810

try:

811

f.result()

812

print(f"ACL created for {acl.principal}")

813

except Exception as e:

814

print(f"Failed to create ACL: {e}")

815

816

# List ACLs

817

acl_filter = AclBindingFilter(

818

restype=ResourceType.TOPIC,

819

name=None, # All topics

820

resource_pattern_type=ResourcePatternType.ANY,

821

principal=None, # All principals

822

host=None, # All hosts

823

operation=AclOperation.ANY,

824

permission_type=AclPermissionType.ANY

825

)

826

827

fs = admin_client.describe_acls(acl_filter, request_timeout=10)

828

try:

829

acl_bindings = fs.result()

830

for acl in acl_bindings:

831

print(f"ACL: {acl.principal} {acl.permission_type} {acl.operation} on {acl.name}")

832

except Exception as e:

833

print(f"Failed to list ACLs: {e}")

834

```