or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin.mdconnection.mdconsumer.mderrors.mdindex.mdproducer.mdserialization.mdstructs.md

admin.mddocs/

0

# Administrative Operations

1

2

Administrative client for managing Kafka cluster resources including topics, consumer groups, configurations, and access control lists (ACLs). Provides comprehensive cluster management capabilities.

3

4

## Capabilities

5

6

### KafkaAdminClient

7

8

Main administrative client for cluster management operations.

9

10

```python { .api }

11

class KafkaAdminClient:

12

def __init__(self, **configs):

13

"""

14

Create a KafkaAdminClient instance.

15

16

Args:

17

**configs: Admin client configuration options including:

18

bootstrap_servers (list): List of Kafka brokers

19

client_id (str): Client identifier

20

request_timeout_ms (int): Request timeout

21

connections_max_idle_ms (int): Connection idle timeout

22

retry_backoff_ms (int): Retry backoff time

23

security_protocol (str): Security protocol

24

ssl_context: SSL context

25

sasl_mechanism (str): SASL mechanism

26

sasl_plain_username (str): SASL username

27

sasl_plain_password (str): SASL password

28

"""

29

30

def create_topics(self, new_topics, timeout_ms=None, validate_only=False):

31

"""

32

Create new topics.

33

34

Args:

35

new_topics (list): List of NewTopic objects

36

timeout_ms (int): Operation timeout

37

validate_only (bool): Only validate, don't create

38

39

Returns:

40

dict: Dictionary mapping topic name to CreateTopicsResponse.topic_errors

41

"""

42

43

def delete_topics(self, topics, timeout_ms=None):

44

"""

45

Delete topics.

46

47

Args:

48

topics (list): List of topic names to delete

49

timeout_ms (int): Operation timeout

50

51

Returns:

52

dict: Dictionary mapping topic name to DeleteTopicsResponse.topic_errors

53

"""

54

55

def list_topics(self, timeout_ms=None):

56

"""

57

List all topics in cluster.

58

59

Args:

60

timeout_ms (int): Operation timeout

61

62

Returns:

63

ClusterMetadata: Cluster metadata with topic information

64

"""

65

66

def describe_topics(self, topics, timeout_ms=None):

67

"""

68

Get detailed information about topics.

69

70

Args:

71

topics (list): List of topic names

72

timeout_ms (int): Operation timeout

73

74

Returns:

75

dict: Dictionary mapping topic name to TopicMetadata

76

"""

77

78

def create_partitions(self, partition_updates, timeout_ms=None, validate_only=False):

79

"""

80

Add partitions to existing topics.

81

82

Args:

83

partition_updates (dict): Dictionary mapping topic name to NewPartitions

84

timeout_ms (int): Operation timeout

85

validate_only (bool): Only validate, don't create

86

87

Returns:

88

dict: Dictionary mapping topic name to CreatePartitionsResponse.topic_errors

89

"""

90

91

def describe_configs(self, config_resources, timeout_ms=None, include_synonyms=False):

92

"""

93

Get configuration for resources.

94

95

Args:

96

config_resources (list): List of ConfigResource objects

97

timeout_ms (int): Operation timeout

98

include_synonyms (bool): Include config synonyms

99

100

Returns:

101

dict: Dictionary mapping ConfigResource to DescribeConfigsResponse.resources

102

"""

103

104

def alter_configs(self, config_updates, timeout_ms=None, validate_only=False):

105

"""

106

Alter configuration for resources.

107

108

Args:

109

config_updates (dict): Dictionary mapping ConfigResource to config changes

110

timeout_ms (int): Operation timeout

111

validate_only (bool): Only validate, don't alter

112

113

Returns:

114

dict: Dictionary mapping ConfigResource to AlterConfigsResponse.resources

115

"""

116

117

def list_consumer_groups(self, timeout_ms=None):

118

"""

119

List consumer groups in cluster.

120

121

Args:

122

timeout_ms (int): Operation timeout

123

124

Returns:

125

list: List of GroupInformation objects

126

"""

127

128

def describe_consumer_groups(self, group_ids, timeout_ms=None):

129

"""

130

Get detailed information about consumer groups.

131

132

Args:

133

group_ids (list): List of consumer group IDs

134

timeout_ms (int): Operation timeout

135

136

Returns:

137

dict: Dictionary mapping group ID to GroupInformation

138

"""

139

140

def delete_consumer_groups(self, group_ids, timeout_ms=None):

141

"""

142

Delete consumer groups.

143

144

Args:

145

group_ids (list): List of consumer group IDs to delete

146

timeout_ms (int): Operation timeout

147

148

Returns:

149

dict: Dictionary mapping group ID to delete response

150

"""

151

152

def list_consumer_group_offsets(self, group_id, partitions=None, timeout_ms=None):

153

"""

154

Get committed offsets for consumer group.

155

156

Args:

157

group_id (str): Consumer group ID

158

partitions (list): List of TopicPartition objects (None = all)

159

timeout_ms (int): Operation timeout

160

161

Returns:

162

dict: Dictionary mapping TopicPartition to OffsetAndMetadata

163

"""

164

165

def alter_consumer_group_offsets(self, group_id, offsets, timeout_ms=None):

166

"""

167

Alter committed offsets for consumer group.

168

169

Args:

170

group_id (str): Consumer group ID

171

offsets (dict): Dictionary mapping TopicPartition to OffsetAndMetadata

172

timeout_ms (int): Operation timeout

173

174

Returns:

175

dict: Dictionary mapping TopicPartition to alter response

176

"""

177

178

def create_acls(self, acls, timeout_ms=None):

179

"""

180

Create access control lists.

181

182

Args:

183

acls (list): List of ACL objects

184

timeout_ms (int): Operation timeout

185

186

Returns:

187

list: List of CreateAclsResponse.creation_responses

188

"""

189

190

def describe_acls(self, acl_filter, timeout_ms=None):

191

"""

192

Describe access control lists matching filter.

193

194

Args:

195

acl_filter (ACLFilter): Filter for ACLs to describe

196

timeout_ms (int): Operation timeout

197

198

Returns:

199

list: List of ACL objects matching filter

200

"""

201

202

def delete_acls(self, acl_filters, timeout_ms=None):

203

"""

204

Delete access control lists matching filters.

205

206

Args:

207

acl_filters (list): List of ACLFilter objects

208

timeout_ms (int): Operation timeout

209

210

Returns:

211

list: List of DeleteAclsResponse.filter_responses

212

"""

213

214

def describe_cluster(self, timeout_ms=None):

215

"""

216

Get cluster metadata including brokers and cluster ID.

217

218

Args:

219

timeout_ms (int): Operation timeout

220

221

Returns:

222

ClusterMetadata: Cluster information including brokers and cluster ID

223

"""

224

225

def describe_log_dirs(self, broker_ids=None, timeout_ms=None):

226

"""

227

Describe log directories on brokers.

228

229

Args:

230

broker_ids (list): List of broker IDs (None = all brokers)

231

timeout_ms (int): Operation timeout

232

233

Returns:

234

dict: Dictionary mapping broker ID to log directory information

235

"""

236

237

def close(self):

238

"""Close the admin client and release resources."""

239

```

240

241

### Topic Management Types

242

243

Types for creating and managing topics.

244

245

```python { .api }

246

class NewTopic:

247

def __init__(self, name, num_partitions, replication_factor, replica_assignments=None, topic_configs=None):

248

"""

249

Specification for creating a new topic.

250

251

Args:

252

name (str): Topic name

253

num_partitions (int): Number of partitions

254

replication_factor (int): Replication factor

255

replica_assignments (dict): Custom replica assignments (optional)

256

topic_configs (dict): Topic configuration properties (optional)

257

"""

258

259

name: str

260

num_partitions: int

261

replication_factor: int

262

replica_assignments: dict

263

topic_configs: dict

264

265

class NewPartitions:

266

def __init__(self, total_count, new_assignments=None):

267

"""

268

Specification for adding partitions to existing topic.

269

270

Args:

271

total_count (int): New total partition count

272

new_assignments (list): Replica assignments for new partitions (optional)

273

"""

274

275

total_count: int

276

new_assignments: list

277

```

278

279

### Configuration Management Types

280

281

Types for managing resource configurations.

282

283

```python { .api }

284

class ConfigResource:

285

def __init__(self, resource_type, name, configs=None):

286

"""

287

Resource for configuration operations.

288

289

Args:

290

resource_type (ConfigResourceType): Type of resource

291

name (str): Resource name

292

configs (dict): Configuration properties (optional)

293

"""

294

295

resource_type: ConfigResourceType

296

name: str

297

configs: dict

298

299

class ConfigResourceType:

300

BROKER = 4 # Broker configuration

301

TOPIC = 2 # Topic configuration

302

```

303

304

### Access Control Types

305

306

Types for managing access control lists (ACLs).

307

308

```python { .api }

309

class ACL:

310

def __init__(self, principal, host, operation, permission_type, resource_pattern):

311

"""

312

Access control list entry.

313

314

Args:

315

principal (str): Principal (user/service account)

316

host (str): Host pattern

317

operation (ACLOperation): Operation type

318

permission_type (ACLPermissionType): Permission type

319

resource_pattern (ResourcePattern): Resource pattern

320

"""

321

322

principal: str

323

host: str

324

operation: ACLOperation

325

permission_type: ACLPermissionType

326

resource_pattern: ResourcePattern

327

328

class ACLFilter:

329

def __init__(self, principal=None, host=None, operation=None, permission_type=None, resource_pattern=None):

330

"""Filter for ACL operations."""

331

332

class ResourcePattern:

333

def __init__(self, resource_type, resource_name, pattern_type):

334

"""

335

Resource pattern for ACL matching.

336

337

Args:

338

resource_type (ResourceType): Type of resource

339

resource_name (str): Resource name/pattern

340

pattern_type (ACLResourcePatternType): Pattern matching type

341

"""

342

343

resource_type: ResourceType

344

resource_name: str

345

pattern_type: ACLResourcePatternType

346

347

class ACLOperation:

348

ANY = 1

349

ALL = 2

350

READ = 3

351

WRITE = 4

352

CREATE = 5

353

DELETE = 6

354

ALTER = 7

355

DESCRIBE = 8

356

CLUSTER_ACTION = 9

357

DESCRIBE_CONFIGS = 10

358

ALTER_CONFIGS = 11

359

IDEMPOTENT_WRITE = 12

360

361

class ACLPermissionType:

362

ANY = 1

363

DENY = 2

364

ALLOW = 3

365

366

class ResourceType:

367

UNKNOWN = 0

368

ANY = 1

369

CLUSTER = 4

370

DELEGATION_TOKEN = 6

371

GROUP = 3

372

TOPIC = 2

373

TRANSACTIONAL_ID = 5

374

375

class ACLResourcePatternType:

376

ANY = 1

377

MATCH = 2

378

LITERAL = 3

379

PREFIXED = 4

380

```

381

382

## Usage Examples

383

384

### Topic Management

385

386

```python

387

from kafka import KafkaAdminClient

388

from kafka.admin import NewTopic, NewPartitions

389

390

# Create admin client

391

admin = KafkaAdminClient(

392

bootstrap_servers=['localhost:9092'],

393

client_id='admin-client'

394

)

395

396

# Create topics

397

topics = [

398

NewTopic(

399

name='user-events',

400

num_partitions=6,

401

replication_factor=3,

402

topic_configs={

403

'cleanup.policy': 'compact',

404

'retention.ms': '604800000' # 7 days

405

}

406

),

407

NewTopic(

408

name='analytics',

409

num_partitions=12,

410

replication_factor=2

411

)

412

]

413

414

result = admin.create_topics(topics, timeout_ms=30000)

415

for topic, error in result.values():

416

if error is None:

417

print(f"Topic {topic} created successfully")

418

else:

419

print(f"Failed to create topic {topic}: {error}")

420

421

# Add partitions to existing topic

422

partition_updates = {

423

'user-events': NewPartitions(total_count=10)

424

}

425

admin.create_partitions(partition_updates)

426

427

# Delete topics

428

admin.delete_topics(['old-topic'], timeout_ms=30000)

429

430

admin.close()

431

```

432

433

### Configuration Management

434

435

```python

436

from kafka import KafkaAdminClient

437

from kafka.admin import ConfigResource, ConfigResourceType

438

439

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

440

441

# Get topic configuration

442

topic_resource = ConfigResource(ConfigResourceType.TOPIC, 'my-topic')

443

config_result = admin.describe_configs([topic_resource])

444

445

for resource, config_response in config_result.items():

446

print(f"Configuration for {resource.name}:")

447

for config in config_response.configs:

448

print(f" {config.name} = {config.value}")

449

450

# Alter topic configuration

451

config_updates = {

452

topic_resource: {

453

'retention.ms': '86400000', # 1 day

454

'segment.ms': '3600000' # 1 hour

455

}

456

}

457

admin.alter_configs(config_updates)

458

459

admin.close()

460

```

461

462

### Consumer Group Management

463

464

```python

465

from kafka import KafkaAdminClient

466

from kafka.structs import TopicPartition, OffsetAndMetadata

467

468

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

469

470

# List all consumer groups

471

groups = admin.list_consumer_groups()

472

for group in groups:

473

print(f"Group: {group.group}, State: {group.state}")

474

475

# Get detailed group information

476

group_details = admin.describe_consumer_groups(['my-consumer-group'])

477

for group_id, group_info in group_details.items():

478

print(f"Group {group_id}:")

479

print(f" State: {group_info.state}")

480

print(f" Protocol: {group_info.protocol}")

481

print(f" Members: {len(group_info.members)}")

482

483

# Get committed offsets

484

group_id = 'my-consumer-group'

485

offsets = admin.list_consumer_group_offsets(group_id)

486

for partition, offset_metadata in offsets.items():

487

print(f"{partition.topic}:{partition.partition} = {offset_metadata.offset}")

488

489

# Reset offsets

490

new_offsets = {

491

TopicPartition('my-topic', 0): OffsetAndMetadata(1000, 'reset'),

492

TopicPartition('my-topic', 1): OffsetAndMetadata(2000, 'reset')

493

}

494

admin.alter_consumer_group_offsets(group_id, new_offsets)

495

496

admin.close()

497

```

498

499

### Access Control Management

500

501

```python

502

from kafka import KafkaAdminClient

503

from kafka.admin import (ACL, ACLFilter, ResourcePattern,

504

ACLOperation, ACLPermissionType,

505

ResourceType, ACLResourcePatternType)

506

507

admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

508

509

# Create ACLs

510

acls = [

511

ACL(

512

principal='User:alice',

513

host='*',

514

operation=ACLOperation.READ,

515

permission_type=ACLPermissionType.ALLOW,

516

resource_pattern=ResourcePattern(

517

resource_type=ResourceType.TOPIC,

518

resource_name='user-data',

519

pattern_type=ACLResourcePatternType.LITERAL

520

)

521

),

522

ACL(

523

principal='User:service-account',

524

host='*',

525

operation=ACLOperation.WRITE,

526

permission_type=ACLPermissionType.ALLOW,

527

resource_pattern=ResourcePattern(

528

resource_type=ResourceType.TOPIC,

529

resource_name='events-',

530

pattern_type=ACLResourcePatternType.PREFIXED

531

)

532

)

533

]

534

535

admin.create_acls(acls)

536

537

# List ACLs

538

acl_filter = ACLFilter(

539

resource_pattern=ResourcePatternFilter(

540

resource_type=ResourceType.TOPIC,

541

resource_name=None, # All topics

542

pattern_type=ACLResourcePatternType.ANY

543

)

544

)

545

existing_acls = admin.describe_acls(acl_filter)

546

for acl in existing_acls:

547

print(f"ACL: {acl.principal} {acl.permission_type} {acl.operation} on {acl.resource_pattern}")

548

549

admin.close()

550

```