or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin.mdconsumer.mderrors.mdindex.mdproducer.mdstructures.md

admin.mddocs/

0

# Administrative API

1

2

Administrative client for managing Kafka clusters including topic operations, partition management, configuration changes, access control lists, and consumer group administration.

3

4

## Capabilities

5

6

### KafkaAdminClient

7

8

Main administrative client providing comprehensive cluster management capabilities. Supports all Kafka administrative operations with proper error handling and timeouts.

9

10

```python { .api }

11

class KafkaAdminClient:

12

def __init__(self, **configs):

13

"""

14

Initialize admin client.

15

16

Configuration Parameters:

17

- bootstrap_servers: List[str], broker addresses

18

- client_id: str, client identifier

19

- connections_max_idle_ms: int, max idle time (default: 540000)

20

- request_timeout_ms: int, request timeout (default: 30000)

21

- retry_backoff_ms: int, retry backoff (default: 100)

22

- reconnect_backoff_ms: int, reconnect backoff (default: 50)

23

- security_protocol: str, 'PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'

24

- api_version: tuple, broker API version or 'auto'

25

"""

26

27

def create_topics(self, topic_requests, timeout_ms=None, validate_only=False):

28

"""

29

Create new topics.

30

31

Parameters:

32

- topic_requests: List[NewTopic], topic creation requests

33

- timeout_ms: int, operation timeout

34

- validate_only: bool, validate without creating

35

36

Returns:

37

- Dict[str, Future]: topic name to creation result future

38

"""

39

40

def delete_topics(self, topics, timeout_ms=None):

41

"""

42

Delete topics.

43

44

Parameters:

45

- topics: List[str], topic names to delete

46

- timeout_ms: int, operation timeout

47

48

Returns:

49

- Dict[str, Future]: topic name to deletion result future

50

"""

51

52

def list_topics(self, timeout_ms=None):

53

"""

54

List available topics.

55

56

Parameters:

57

- timeout_ms: int, operation timeout

58

59

Returns:

60

- Set[str]: available topic names

61

"""

62

63

def describe_topics(self, topics, timeout_ms=None):

64

"""

65

Get detailed topic information.

66

67

Parameters:

68

- topics: List[str], topic names to describe

69

- timeout_ms: int, operation timeout

70

71

Returns:

72

- Dict[str, TopicDescription]: topic descriptions

73

"""

74

75

def create_partitions(self, partition_updates, timeout_ms=None, validate_only=False):

76

"""

77

Add partitions to existing topics.

78

79

Parameters:

80

- partition_updates: Dict[str, NewPartitions], topic to partition updates

81

- timeout_ms: int, operation timeout

82

- validate_only: bool, validate without creating

83

84

Returns:

85

- Dict[str, Future]: topic name to result future

86

"""

87

88

def describe_configs(self, config_resources, timeout_ms=None):

89

"""

90

Get configuration for resources.

91

92

Parameters:

93

- config_resources: List[ConfigResource], resources to describe

94

- timeout_ms: int, operation timeout

95

96

Returns:

97

- Dict[ConfigResource, ConfigResourceResult]: configuration results

98

"""

99

100

def alter_configs(self, config_resources, timeout_ms=None):

101

"""

102

Modify configuration for resources.

103

104

Parameters:

105

- config_resources: Dict[ConfigResource, Dict[str, str]], config changes

106

- timeout_ms: int, operation timeout

107

108

Returns:

109

- Dict[ConfigResource, Future]: configuration change results

110

"""

111

112

def describe_acls(self, acl_filter, timeout_ms=None):

113

"""

114

Describe access control lists.

115

116

Parameters:

117

- acl_filter: ACLFilter, filter for ACL queries

118

- timeout_ms: int, operation timeout

119

120

Returns:

121

- List[ACLBinding]: matching ACL bindings

122

"""

123

124

def create_acls(self, acls, timeout_ms=None):

125

"""

126

Create access control lists.

127

128

Parameters:

129

- acls: List[ACL], ACLs to create

130

- timeout_ms: int, operation timeout

131

132

Returns:

133

- Dict[ACL, Future]: ACL creation results

134

"""

135

136

def delete_acls(self, acl_filters, timeout_ms=None):

137

"""

138

Delete access control lists.

139

140

Parameters:

141

- acl_filters: List[ACLFilter], filters for ACLs to delete

142

- timeout_ms: int, operation timeout

143

144

Returns:

145

- List[DeleteAclsResult]: deletion results

146

"""

147

148

def list_consumer_groups(self, timeout_ms=None):

149

"""

150

List consumer groups.

151

152

Parameters:

153

- timeout_ms: int, operation timeout

154

155

Returns:

156

- List[GroupInformation]: consumer group information

157

"""

158

159

def describe_consumer_groups(self, group_ids, timeout_ms=None):

160

"""

161

Get detailed consumer group information.

162

163

Parameters:

164

- group_ids: List[str], group IDs to describe

165

- timeout_ms: int, operation timeout

166

167

Returns:

168

- Dict[str, GroupDescription]: group descriptions

169

"""

170

171

def delete_consumer_groups(self, group_ids, timeout_ms=None):

172

"""

173

Delete consumer groups.

174

175

Parameters:

176

- group_ids: List[str], group IDs to delete

177

- timeout_ms: int, operation timeout

178

179

Returns:

180

- Dict[str, Future]: deletion results

181

"""

182

183

def close(self):

184

"""Close admin client and clean up resources."""

185

```

186

187

### Topic Management

188

189

Classes for creating and modifying topics.

190

191

```python { .api }

192

class NewTopic:

193

def __init__(self, name, num_partitions, replication_factor,

194

replica_assignments=None, topic_configs=None):

195

"""

196

Topic creation specification.

197

198

Parameters:

199

- name: str, topic name

200

- num_partitions: int, number of partitions

201

- replication_factor: int, replication factor

202

- replica_assignments: Dict[int, List[int]], manual replica assignments

203

- topic_configs: Dict[str, str], topic configuration overrides

204

"""

205

self.name = name

206

self.num_partitions = num_partitions

207

self.replication_factor = replication_factor

208

self.replica_assignments = replica_assignments or {}

209

self.topic_configs = topic_configs or {}

210

211

class NewPartitions:

212

def __init__(self, total_count, new_assignments=None):

213

"""

214

Partition addition specification.

215

216

Parameters:

217

- total_count: int, new total partition count

218

- new_assignments: List[List[int]], replica assignments for new partitions

219

"""

220

self.total_count = total_count

221

self.new_assignments = new_assignments

222

223

class TopicDescription:

224

name: str # Topic name

225

partitions: List[PartitionMetadata] # Partition metadata

226

is_internal: bool # Internal topic flag

227

authorizedOperations: List[int] # Authorized operations

228

```

229

230

### Configuration Management

231

232

Classes for managing broker and topic configurations.

233

234

```python { .api }

235

class ConfigResource:

236

def __init__(self, resource_type, name, configs=None):

237

"""

238

Configuration resource specification.

239

240

Parameters:

241

- resource_type: ConfigResourceType, resource type

242

- name: str, resource name

243

- configs: Dict[str, str], configuration key-value pairs

244

"""

245

self.resource_type = resource_type

246

self.name = name

247

self.configs = configs or {}

248

249

class ConfigResourceType:

250

BROKER = 4 # Broker configuration

251

TOPIC = 2 # Topic configuration

252

253

class ConfigResourceResult:

254

configs: Dict[str, ConfigEntry] # Configuration entries

255

error_code: int # Error code (0 = success)

256

error_message: str # Error description

257

258

class ConfigEntry:

259

name: str # Configuration key

260

value: str # Configuration value

261

is_default: bool # Is default value

262

is_sensitive: bool # Is sensitive value

263

is_read_only: bool # Is read-only

264

synonyms: List['ConfigSynonym'] # Configuration synonyms

265

```

266

267

### Access Control Lists (ACL)

268

269

Classes for managing access control and authorization.

270

271

```python { .api }

272

class ACL:

273

def __init__(self, principal, host, operation, permission_type, resource_pattern):

274

"""

275

Access control list entry.

276

277

Parameters:

278

- principal: str, principal (user/service)

279

- host: str, host pattern

280

- operation: ACLOperation, operation type

281

- permission_type: ACLPermissionType, ALLOW or DENY

282

- resource_pattern: ResourcePattern, resource pattern

283

"""

284

self.principal = principal

285

self.host = host

286

self.operation = operation

287

self.permission_type = permission_type

288

self.resource_pattern = resource_pattern

289

290

class ACLFilter:

291

def __init__(self, principal=None, host=None, operation=None,

292

permission_type=None, resource_pattern_filter=None):

293

"""

294

ACL filter for queries (allows ANY values).

295

296

Parameters:

297

- principal: str|ACLFilter.ANY, principal filter

298

- host: str|ACLFilter.ANY, host filter

299

- operation: ACLOperation|ACLFilter.ANY, operation filter

300

- permission_type: ACLPermissionType|ACLFilter.ANY, permission filter

301

- resource_pattern_filter: ResourcePatternFilter, resource filter

302

"""

303

self.principal = principal

304

self.host = host

305

self.operation = operation

306

self.permission_type = permission_type

307

self.resource_pattern_filter = resource_pattern_filter

308

309

class ResourcePattern:

310

def __init__(self, resource_type, resource_name, pattern_type):

311

"""

312

Resource pattern specification.

313

314

Parameters:

315

- resource_type: ResourceType, type of resource

316

- resource_name: str, resource name pattern

317

- pattern_type: ACLResourcePatternType, pattern matching type

318

"""

319

self.resource_type = resource_type

320

self.resource_name = resource_name

321

self.pattern_type = pattern_type

322

323

class ResourcePatternFilter:

324

def __init__(self, resource_type=None, resource_name=None, pattern_type=None):

325

"""Resource pattern filter (allows ANY values)."""

326

self.resource_type = resource_type

327

self.resource_name = resource_name

328

self.pattern_type = pattern_type

329

```

330

331

### ACL Enumerations

332

333

```python { .api }

334

class ACLOperation:

335

ANY = -1

336

ALL = 0

337

READ = 1

338

WRITE = 2

339

CREATE = 3

340

DELETE = 4

341

ALTER = 5

342

DESCRIBE = 6

343

CLUSTER_ACTION = 7

344

DESCRIBE_CONFIGS = 8

345

ALTER_CONFIGS = 9

346

IDEMPOTENT_WRITE = 10

347

348

class ResourceType:

349

UNKNOWN = 0

350

ANY = 1

351

CLUSTER = 2

352

DELEGATION_TOKEN = 3

353

GROUP = 4

354

TOPIC = 5

355

TRANSACTIONAL_ID = 6

356

357

class ACLPermissionType:

358

ANY = 0

359

DENY = 1

360

ALLOW = 2

361

362

class ACLResourcePatternType:

363

ANY = 0

364

MATCH = 1

365

LITERAL = 2

366

PREFIXED = 3

367

```

368

369

### Consumer Group Management

370

371

Classes for consumer group administration.

372

373

```python { .api }

374

class GroupDescription:

375

group_id: str # Group ID

376

is_simple_consumer_group: bool # Simple consumer group flag

377

members: List[MemberDescription] # Group members

378

partition_assignor: str # Partition assignment strategy

379

state: str # Group state

380

coordinator: Node # Group coordinator

381

authorized_operations: List[int] # Authorized operations

382

383

class MemberDescription:

384

member_id: str # Member ID

385

client_id: str # Client ID

386

host: str # Client host

387

assignment: MemberAssignment # Partition assignment

388

389

class MemberAssignment:

390

topic_partitions: Set[TopicPartition] # Assigned partitions

391

```

392

393

## Usage Examples

394

395

### Topic Management

396

397

```python

398

from kafka import KafkaAdminClient

399

from kafka.admin import NewTopic, NewPartitions

400

from kafka.errors import TopicAlreadyExistsError, KafkaError

401

402

# Create admin client

403

admin = KafkaAdminClient(

404

bootstrap_servers=['localhost:9092'],

405

client_id='admin-client'

406

)

407

408

try:

409

# Create topics

410

topics = [

411

NewTopic(name='events', num_partitions=3, replication_factor=1),

412

NewTopic(name='logs', num_partitions=6, replication_factor=1,

413

topic_configs={'retention.ms': '86400000'}) # 1 day retention

414

]

415

416

create_result = admin.create_topics(topics, timeout_ms=30000)

417

418

# Wait for results

419

for topic, future in create_result.items():

420

try:

421

future.result() # Block until completion

422

print(f"Topic '{topic}' created successfully")

423

except TopicAlreadyExistsError:

424

print(f"Topic '{topic}' already exists")

425

except KafkaError as e:

426

print(f"Failed to create topic '{topic}': {e}")

427

428

# List topics

429

topics = admin.list_topics(timeout_ms=10000)

430

print(f"Available topics: {list(topics)}")

431

432

# Add partitions to existing topic

433

partition_updates = {

434

'events': NewPartitions(total_count=5) # Increase from 3 to 5 partitions

435

}

436

437

partition_result = admin.create_partitions(partition_updates, timeout_ms=30000)

438

for topic, future in partition_result.items():

439

try:

440

future.result()

441

print(f"Added partitions to topic '{topic}'")

442

except KafkaError as e:

443

print(f"Failed to add partitions to '{topic}': {e}")

444

445

finally:

446

admin.close()

447

```

448

449

### Topic Description and Metadata

450

451

```python

452

from kafka import KafkaAdminClient

453

454

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

455

456

try:

457

# Get detailed topic information

458

topic_descriptions = admin.describe_topics(['events', 'logs'], timeout_ms=10000)

459

460

for topic_name, description in topic_descriptions.items():

461

print(f"\nTopic: {topic_name}")

462

print(f"Internal: {description.is_internal}")

463

print(f"Partitions: {len(description.partitions)}")

464

465

for partition in description.partitions:

466

print(f" Partition {partition.partition}:")

467

print(f" Leader: {partition.leader}")

468

print(f" Replicas: {partition.replicas}")

469

print(f" ISR: {partition.isr}")

470

471

finally:

472

admin.close()

473

```

474

475

### Configuration Management

476

477

```python

478

from kafka import KafkaAdminClient

479

from kafka.admin import ConfigResource, ConfigResourceType

480

481

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

482

483

try:

484

# Describe topic configurations

485

topic_resource = ConfigResource(ConfigResourceType.TOPIC, 'events')

486

broker_resource = ConfigResource(ConfigResourceType.BROKER, '0')

487

488

config_results = admin.describe_configs([topic_resource, broker_resource],

489

timeout_ms=10000)

490

491

for resource, result in config_results.items():

492

print(f"\nConfigurations for {resource.resource_type} '{resource.name}':")

493

for name, entry in result.configs.items():

494

if not entry.is_default: # Only show non-default configs

495

print(f" {name} = {entry.value}")

496

497

# Alter topic configuration

498

config_updates = {

499

topic_resource: {

500

'retention.ms': '172800000', # 2 days

501

'cleanup.policy': 'delete'

502

}

503

}

504

505

alter_result = admin.alter_configs(config_updates, timeout_ms=30000)

506

for resource, future in alter_result.items():

507

try:

508

future.result()

509

print(f"Configuration updated for {resource.name}")

510

except KafkaError as e:

511

print(f"Failed to update configuration: {e}")

512

513

finally:

514

admin.close()

515

```

516

517

### Access Control Lists (ACL)

518

519

```python

520

from kafka import KafkaAdminClient

521

from kafka.admin import (ACL, ACLFilter, ResourcePattern, ResourcePatternFilter,

522

ACLOperation, ACLPermissionType, ResourceType,

523

ACLResourcePatternType)

524

525

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

526

527

try:

528

# Create ACLs

529

acls = [

530

ACL(

531

principal='User:alice',

532

host='*',

533

operation=ACLOperation.READ,

534

permission_type=ACLPermissionType.ALLOW,

535

resource_pattern=ResourcePattern(

536

resource_type=ResourceType.TOPIC,

537

resource_name='events',

538

pattern_type=ACLResourcePatternType.LITERAL

539

)

540

),

541

ACL(

542

principal='User:bob',

543

host='192.168.1.*',

544

operation=ACLOperation.WRITE,

545

permission_type=ACLPermissionType.ALLOW,

546

resource_pattern=ResourcePattern(

547

resource_type=ResourceType.TOPIC,

548

resource_name='logs-*',

549

pattern_type=ACLResourcePatternType.PREFIXED

550

)

551

)

552

]

553

554

create_result = admin.create_acls(acls, timeout_ms=30000)

555

for acl, future in create_result.items():

556

try:

557

future.result()

558

print(f"ACL created for {acl.principal}")

559

except KafkaError as e:

560

print(f"Failed to create ACL: {e}")

561

562

# List ACLs

563

acl_filter = ACLFilter(

564

resource_pattern_filter=ResourcePatternFilter(

565

resource_type=ResourceType.TOPIC

566

)

567

)

568

569

acl_bindings = admin.describe_acls(acl_filter, timeout_ms=10000)

570

print(f"\nFound {len(acl_bindings)} ACLs:")

571

for binding in acl_bindings:

572

print(f" {binding.principal} {binding.permission_type} "

573

f"{binding.operation} on {binding.pattern.resource_name}")

574

575

finally:

576

admin.close()

577

```

578

579

### Consumer Group Management

580

581

```python

582

from kafka import KafkaAdminClient

583

584

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

585

586

try:

587

# List consumer groups

588

groups = admin.list_consumer_groups(timeout_ms=10000)

589

print(f"Found {len(groups)} consumer groups:")

590

for group in groups:

591

print(f" Group: {group.group}, State: {group.state}")

592

593

# Describe specific consumer groups

594

group_ids = ['my-consumer-group', 'batch-processor']

595

descriptions = admin.describe_consumer_groups(group_ids, timeout_ms=10000)

596

597

for group_id, description in descriptions.items():

598

print(f"\nGroup: {group_id}")

599

print(f"State: {description.state}")

600

print(f"Coordinator: {description.coordinator}")

601

print(f"Assignment Strategy: {description.partition_assignor}")

602

print(f"Members: {len(description.members)}")

603

604

for member in description.members:

605

print(f" Member: {member.member_id}")

606

print(f" Client: {member.client_id}")

607

print(f" Host: {member.host}")

608

print(f" Partitions: {len(member.assignment.topic_partitions)}")

609

610

# Delete inactive consumer group

611

delete_result = admin.delete_consumer_groups(['inactive-group'], timeout_ms=30000)

612

for group_id, future in delete_result.items():

613

try:

614

future.result()

615

print(f"Consumer group '{group_id}' deleted")

616

except KafkaError as e:

617

print(f"Failed to delete group '{group_id}': {e}")

618

619

finally:

620

admin.close()

621

```

622

623

### Cluster Information

624

625

```python

626

from kafka import KafkaAdminClient

627

from kafka.client_async import KafkaClient

628

629

# Using admin client for high-level operations

630

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

631

632

# Using low-level client for detailed cluster info

633

client = KafkaClient(bootstrap_servers=['localhost:9092'])

634

635

try:

636

# Wait for client to connect and load metadata

637

client.poll(timeout_ms=5000)

638

639

# Get cluster metadata

640

cluster = client.cluster

641

642

print("Cluster Information:")

643

print(f"Cluster ID: {cluster.cluster_id}")

644

print(f"Controller: {cluster.controller}")

645

646

print(f"\nBrokers ({len(cluster.brokers())}):")

647

for broker in cluster.brokers():

648

print(f" Broker {broker.nodeId}: {broker.host}:{broker.port}")

649

if broker.rack:

650

print(f" Rack: {broker.rack}")

651

652

print(f"\nTopics ({len(cluster.topics())}):")

653

for topic in sorted(cluster.topics()):

654

partitions = cluster.partitions_for_topic(topic)

655

print(f" {topic}: {len(partitions)} partitions")

656

657

for partition_id in sorted(partitions):

658

partition = cluster.leader_for_partition(TopicPartition(topic, partition_id))

659

print(f" Partition {partition_id}: Leader {partition}")

660

661

finally:

662

admin.close()

663

client.close()

664

```

665

666

### Batch Operations

667

668

```python

669

from kafka import KafkaAdminClient

670

from kafka.admin import NewTopic

671

import concurrent.futures

672

673

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

674

675

try:

676

# Create multiple topics concurrently

677

topics = [

678

NewTopic(f'partition-{i}', num_partitions=i+1, replication_factor=1)

679

for i in range(10)

680

]

681

682

create_result = admin.create_topics(topics, timeout_ms=60000)

683

684

# Process results as they complete

685

with concurrent.futures.ThreadPoolExecutor() as executor:

686

# Submit all futures

687

future_to_topic = {

688

executor.submit(future.result): topic_name

689

for topic_name, future in create_result.items()

690

}

691

692

# Process completed futures

693

for future in concurrent.futures.as_completed(future_to_topic, timeout=60):

694

topic_name = future_to_topic[future]

695

try:

696

future.result()

697

print(f"✓ Topic '{topic_name}' created")

698

except Exception as e:

699

print(f"✗ Topic '{topic_name}' failed: {e}")

700

701

finally:

702

admin.close()

703

```