or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.kafka/kafka_2.13@4.1.x

docs

clients

admin.mdconsumer.mdproducer.md
index.md
tile.json

tessl/maven-org-apache-kafka--kafka-2-13

tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0

Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.

admin.mddocs/clients/

Admin API

The Admin API manages Kafka resources programmatically. The Admin client is thread-safe and can be shared across multiple threads.

Core Interface

Admin

Main admin interface for managing Kafka resources.

package org.apache.kafka.clients.admin;

public interface Admin extends AutoCloseable {
    // Factory methods
    static Admin create(Properties props);
    static Admin create(Map<String, Object> conf);

    // Lifecycle
    void close();
    void close(Duration timeout);

    // Metrics and identification
    Map<MetricName, ? extends Metric> metrics();
    void registerMetricForSubscription(KafkaMetric metric);
    void unregisterMetricFromSubscription(KafkaMetric metric);
    Uuid clientInstanceId(Duration timeout);
}

Basic Usage:

import org.apache.kafka.clients.admin.*;
import java.util.Properties;

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

try (Admin admin = Admin.create(props)) {
    // Use admin client
}

Topic Operations

Create Topics

// Admin interface method
CreateTopicsResult createTopics(Collection<NewTopic> newTopics);
CreateTopicsResult createTopics(Collection<NewTopic> newTopics,
                               CreateTopicsOptions options);

NewTopic:

package org.apache.kafka.clients.admin;

public class NewTopic {
    // Constructor with partitions and replication factor
    public NewTopic(String name, int numPartitions, short replicationFactor);

    // Constructor with replica assignments
    public NewTopic(String name, Map<Integer, List<Integer>> replicasAssignments);

    // Fluent configuration
    public NewTopic configs(Map<String, String> configs);
}

CreateTopicsResult:

package org.apache.kafka.clients.admin;

public class CreateTopicsResult {
    // Future for all topics
    KafkaFuture<Void> all();

    // Future for individual topics
    Map<String, KafkaFuture<Void>> values();

    // Additional metadata
    KafkaFuture<Config> config(String topic);
    KafkaFuture<Uuid> topicId(String topic);
    KafkaFuture<Integer> numPartitions(String topic);
    KafkaFuture<Integer> replicationFactor(String topic);
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import java.util.*;
import java.util.concurrent.ExecutionException;

try (Admin admin = Admin.create(props)) {
    // Create topic with partitions and replication factor
    NewTopic newTopic = new NewTopic("my-topic", 3, (short) 2);

    // Add topic configuration
    Map<String, String> configs = new HashMap<>();
    configs.put("retention.ms", "86400000"); // 1 day
    configs.put("compression.type", "snappy");
    newTopic.configs(configs);

    CreateTopicsResult result = admin.createTopics(Collections.singletonList(newTopic));

    // Wait for completion
    result.all().get();
    System.out.println("Topic created successfully");
} catch (ExecutionException e) {
    if (e.getCause() instanceof TopicExistsException) {
        System.out.println("Topic already exists");
    }
}

Delete Topics

// Admin interface methods
DeleteTopicsResult deleteTopics(Collection<String> topics);
DeleteTopicsResult deleteTopics(TopicCollection topics);
DeleteTopicsResult deleteTopics(TopicCollection topics, DeleteTopicsOptions options);

DeleteTopicsResult:

package org.apache.kafka.clients.admin;

public class DeleteTopicsResult {
    // Future for all topics
    KafkaFuture<Void> all();

    // Futures by topic name
    Map<String, KafkaFuture<Void>> topicNameValues();

    // Futures by topic ID
    Map<Uuid, KafkaFuture<Void>> topicIdValues();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    DeleteTopicsResult result = admin.deleteTopics(
        Arrays.asList("topic1", "topic2"));

    result.all().get();
    System.out.println("Topics deleted successfully");
}

List Topics

// Admin interface methods
ListTopicsResult listTopics();
ListTopicsResult listTopics(ListTopicsOptions options);

ListTopicsResult:

package org.apache.kafka.clients.admin;

public class ListTopicsResult {
    KafkaFuture<Map<String, TopicListing>> namesToListings();
    KafkaFuture<Collection<TopicListing>> listings();
    KafkaFuture<Set<String>> names();
}

TopicListing:

package org.apache.kafka.clients.admin;

public class TopicListing {
    public String name();
    public Uuid topicId();
    public boolean isInternal();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    ListTopicsOptions options = new ListTopicsOptions()
        .listInternal(false); // Exclude internal topics

    ListTopicsResult result = admin.listTopics(options);

    Set<String> topicNames = result.names().get();
    System.out.println("Topics: " + topicNames);

    // Or get full listings
    Collection<TopicListing> listings = result.listings().get();
    for (TopicListing listing : listings) {
        System.out.println("Topic: " + listing.name() +
            ", Internal: " + listing.isInternal());
    }
}

Describe Topics

// Admin interface methods
DescribeTopicsResult describeTopics(Collection<String> topicNames);
DescribeTopicsResult describeTopics(TopicCollection topics);
DescribeTopicsResult describeTopics(TopicCollection topics, DescribeTopicsOptions options);

DescribeTopicsResult:

package org.apache.kafka.clients.admin;

public class DescribeTopicsResult {
    Map<String, KafkaFuture<TopicDescription>> topicNameValues();
    Map<Uuid, KafkaFuture<TopicDescription>> topicIdValues();
    KafkaFuture<Map<String, TopicDescription>> allTopicNames();
    KafkaFuture<Map<Uuid, TopicDescription>> allTopicIds();
}

TopicDescription:

package org.apache.kafka.clients.admin;

public class TopicDescription {
    public String name();
    public boolean isInternal();
    public List<TopicPartitionInfo> partitions();
    public Set<AclOperation> authorizedOperations();
    public Uuid topicId();
}

TopicPartitionInfo:

Provides detailed information about a single partition within a topic. This class is distinct from PartitionInfo (used by consumers) in that it includes offline replica information.

package org.apache.kafka.clients.admin;

public class TopicPartitionInfo {
    public int partition();
    public Node leader();
    public List<Node> replicas();
    public List<Node> isr(); // In-sync replicas
    public List<Node> offlineReplicas();
}

Key Distinctions:

  • replicas(): All assigned replicas for this partition
  • isr(): In-sync replicas that are caught up with the leader
  • offlineReplicas(): Replicas that are currently offline (not available in consumer's PartitionInfo)
  • The leader may be null if no leader is currently elected

Usage Example:

import org.apache.kafka.clients.admin.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    DescribeTopicsResult result = admin.describeTopics(
        Arrays.asList("my-topic"));

    Map<String, TopicDescription> descriptions = result.allTopicNames().get();
    for (Map.Entry<String, TopicDescription> entry : descriptions.entrySet()) {
        TopicDescription desc = entry.getValue();
        System.out.println("Topic: " + desc.name());
        System.out.println("Partitions: " + desc.partitions().size());
        System.out.println("Internal: " + desc.isInternal());

        for (TopicPartitionInfo partition : desc.partitions()) {
            System.out.println("  Partition " + partition.partition() +
                ": leader=" + partition.leader().id() +
                ", replicas=" + partition.replicas().size() +
                ", isr=" + partition.isr().size());
        }
    }
}

Create Partitions

// Admin interface method
CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions);
CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions,
                                       CreatePartitionsOptions options);

NewPartitions:

package org.apache.kafka.clients.admin;

public class NewPartitions {
    // Increase partition count
    public static NewPartitions increaseTo(int totalCount);

    // Increase with specific replica assignments
    public static NewPartitions increaseTo(int totalCount,
                                          List<List<Integer>> newAssignments);
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    Map<String, NewPartitions> newPartitions = new HashMap<>();
    newPartitions.put("my-topic", NewPartitions.increaseTo(5));

    CreatePartitionsResult result = admin.createPartitions(newPartitions);
    result.all().get();
    System.out.println("Partitions increased successfully");
}

Configuration Operations

Describe Configs

// Admin interface method
DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources);
DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources,
                                     DescribeConfigsOptions options);

ConfigResource:

package org.apache.kafka.clients.admin;

public class ConfigResource {
    public enum Type {
        BROKER,
        TOPIC,
        BROKER_LOGGER,
        CLIENT_METRICS
    }

    public ConfigResource(Type type, String name);

    public Type type();
    public String name();
}

Config:

package org.apache.kafka.clients.admin;

public class Config {
    public Collection<ConfigEntry> entries();
    public ConfigEntry get(String name);
}

ConfigEntry:

package org.apache.kafka.clients.admin;

public class ConfigEntry {
    public enum ConfigType {
        UNKNOWN, BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS, PASSWORD
    }

    public enum ConfigSource {
        UNKNOWN, DYNAMIC_TOPIC_CONFIG, DYNAMIC_BROKER_CONFIG,
        DYNAMIC_DEFAULT_BROKER_CONFIG, STATIC_BROKER_CONFIG, DEFAULT_CONFIG
    }

    public String name();
    public String value();
    public ConfigSource source();
    public boolean isSensitive();
    public boolean isReadOnly();
    public ConfigType type();
    public String documentation();
    public List<ConfigSynonym> synonyms();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "my-topic");

    DescribeConfigsResult result = admin.describeConfigs(
        Collections.singletonList(resource));

    Map<ConfigResource, Config> configs = result.all().get();
    Config config = configs.get(resource);

    for (ConfigEntry entry : config.entries()) {
        System.out.println(entry.name() + " = " + entry.value() +
            " (source: " + entry.source() + ")");
    }
}

Alter Configs (Incremental)

// Admin interface method
AlterConfigsResult incrementalAlterConfigs(
    Map<ConfigResource, Collection<AlterConfigOp>> configs);
AlterConfigsResult incrementalAlterConfigs(
    Map<ConfigResource, Collection<AlterConfigOp>> configs,
    AlterConfigsOptions options);

AlterConfigOp:

package org.apache.kafka.clients.admin;

public class AlterConfigOp {
    public enum OpType {
        SET,      // Set value
        DELETE,   // Delete value
        APPEND,   // Append to list
        SUBTRACT  // Remove from list
    }

    public AlterConfigOp(ConfigEntry configEntry, OpType opType);

    public ConfigEntry configEntry();
    public OpType opType();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "my-topic");

    // Change retention and compression
    Collection<AlterConfigOp> ops = Arrays.asList(
        new AlterConfigOp(new ConfigEntry("retention.ms", "172800000"),
            AlterConfigOp.OpType.SET),
        new AlterConfigOp(new ConfigEntry("compression.type", "lz4"),
            AlterConfigOp.OpType.SET)
    );

    Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
    configs.put(resource, ops);

    AlterConfigsResult result = admin.incrementalAlterConfigs(configs);
    result.all().get();
    System.out.println("Configuration updated successfully");
}

Consumer Group Operations

Describe Consumer Groups

// Admin interface method
DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds);
DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds,
                                                   DescribeConsumerGroupsOptions options);

ConsumerGroupDescription:

package org.apache.kafka.clients.admin;

public class ConsumerGroupDescription {
    public String groupId();
    public boolean isSimpleConsumerGroup();
    public Collection<MemberDescription> members();
    public String partitionAssignor();
    public ConsumerGroupType type();
    public ConsumerGroupState groupState();
    public Node coordinator();
    public Set<AclOperation> authorizedOperations();
    public int groupEpoch();
    public int targetAssignmentEpoch();
}

MemberDescription:

package org.apache.kafka.clients.admin;

public class MemberDescription {
    public String memberId();
    public Optional<String> groupInstanceId();
    public String clientId();
    public String host();
    public MemberAssignment assignment();
    public Optional<MemberAssignment> targetAssignment();
    public int memberEpoch();
    public boolean upgraded();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    DescribeConsumerGroupsResult result = admin.describeConsumerGroups(
        Arrays.asList("my-group"));

    Map<String, ConsumerGroupDescription> descriptions = result.all().get();
    for (Map.Entry<String, ConsumerGroupDescription> entry : descriptions.entrySet()) {
        ConsumerGroupDescription desc = entry.getValue();
        System.out.println("Group: " + desc.groupId());
        System.out.println("State: " + desc.groupState());
        System.out.println("Coordinator: " + desc.coordinator());
        System.out.println("Members: " + desc.members().size());

        for (MemberDescription member : desc.members()) {
            System.out.println("  Member: " + member.memberId() +
                ", client: " + member.clientId() +
                ", host: " + member.host());
        }
    }
}

List Consumer Groups

// Admin interface method
ListConsumerGroupsResult listConsumerGroups();
ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options);

Usage Example:

import org.apache.kafka.clients.admin.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    ListConsumerGroupsResult result = admin.listConsumerGroups();

    Collection<ConsumerGroupListing> groups = result.all().get();
    for (ConsumerGroupListing group : groups) {
        System.out.println("Group: " + group.groupId() +
            ", Type: " + group.type() +
            ", State: " + group.state());
    }
}

List Consumer Group Offsets

// Admin interface methods
ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId);
ListConsumerGroupOffsetsResult listConsumerGroupOffsets(
    Map<String, ListConsumerGroupOffsetsSpec> groupSpecs);
ListConsumerGroupOffsetsResult listConsumerGroupOffsets(
    Map<String, ListConsumerGroupOffsetsSpec> groupSpecs,
    ListConsumerGroupOffsetsOptions options);

ListConsumerGroupOffsetsSpec:

package org.apache.kafka.clients.admin;

public class ListConsumerGroupOffsetsSpec {
    // Factory method
    public static ListConsumerGroupOffsetsSpec topicPartitions(
        Collection<TopicPartition> topicPartitions);
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    ListConsumerGroupOffsetsResult result =
        admin.listConsumerGroupOffsets("my-group");

    Map<TopicPartition, OffsetAndMetadata> offsets =
        result.partitionsToOffsetAndMetadata().get();

    for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
        System.out.println("Partition: " + entry.getKey() +
            ", Offset: " + entry.getValue().offset() +
            ", Metadata: " + entry.getValue().metadata());
    }
}

Alter Consumer Group Offsets

// Admin interface method
AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(
    String groupId,
    Map<TopicPartition, OffsetAndMetadata> offsets);
AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(
    String groupId,
    Map<TopicPartition, OffsetAndMetadata> offsets,
    AlterConsumerGroupOffsetsOptions options);

Usage Example:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    // Reset offsets to specific values
    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    offsets.put(new TopicPartition("my-topic", 0), new OffsetAndMetadata(100L));
    offsets.put(new TopicPartition("my-topic", 1), new OffsetAndMetadata(200L));

    AlterConsumerGroupOffsetsResult result =
        admin.alterConsumerGroupOffsets("my-group", offsets);

    result.all().get();
    System.out.println("Offsets reset successfully");
}

Delete Consumer Groups

// Admin interface method
DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds);
DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds,
                                               DeleteConsumerGroupsOptions options);

Remove Members from Consumer Group

// Admin interface method
RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(
    String groupId,
    RemoveMembersFromConsumerGroupOptions options);

Usage Example:

import org.apache.kafka.clients.admin.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    RemoveMembersFromConsumerGroupOptions options =
        new RemoveMembersFromConsumerGroupOptions();

    // Remove all members (force rebalance)
    RemoveMembersFromConsumerGroupResult result =
        admin.removeMembersFromConsumerGroup("my-group", options);

    result.all().get();
    System.out.println("Members removed successfully");
}

Quota Management Operations

Describe Client Quotas

// Admin interface method
DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter);
DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter,
                                               DescribeClientQuotasOptions options);

ClientQuotaFilter:

package org.apache.kafka.common.quota;

public class ClientQuotaFilter {
    // Match entities with specified components (inclusive)
    public static ClientQuotaFilter contains(Collection<ClientQuotaFilterComponent> components);

    // Match only entities with exactly these components (strict)
    public static ClientQuotaFilter containsOnly(Collection<ClientQuotaFilterComponent> components);

    // Match all configured entities
    public static ClientQuotaFilter all();

    public Collection<ClientQuotaFilterComponent> components();
    public boolean strict();
}

ClientQuotaFilterComponent:

package org.apache.kafka.common.quota;

public class ClientQuotaFilterComponent {
    // Match exact entity name
    public static ClientQuotaFilterComponent ofEntity(String entityType, String entityName);

    // Match default entity
    public static ClientQuotaFilterComponent ofDefaultEntity(String entityType);

    // Match any entity of this type
    public static ClientQuotaFilterComponent ofEntityType(String entityType);

    public String entityType();
    public Optional<String> match();
}

ClientQuotaEntity:

package org.apache.kafka.common.quota;

public class ClientQuotaEntity {
    public static final String USER = "user";
    public static final String CLIENT_ID = "client-id";
    public static final String IP = "ip";

    public ClientQuotaEntity(Map<String, String> entries);

    public Map<String, String> entries();
}

DescribeClientQuotasResult:

package org.apache.kafka.clients.admin;

public class DescribeClientQuotasResult {
    // Returns map of entity to quota values
    KafkaFuture<Map<ClientQuotaEntity, Map<String, Double>>> entities();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.quota.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    // Describe quotas for a specific user
    ClientQuotaFilterComponent userComponent =
        ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER, "user1");
    ClientQuotaFilter filter = ClientQuotaFilter.contains(
        Collections.singletonList(userComponent));

    DescribeClientQuotasResult result = admin.describeClientQuotas(filter);

    Map<ClientQuotaEntity, Map<String, Double>> quotas = result.entities().get();
    for (Map.Entry<ClientQuotaEntity, Map<String, Double>> entry : quotas.entrySet()) {
        System.out.println("Entity: " + entry.getKey().entries());
        for (Map.Entry<String, Double> quota : entry.getValue().entrySet()) {
            System.out.println("  " + quota.getKey() + ": " + quota.getValue());
        }
    }
}

Alter Client Quotas

// Admin interface method
AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries);
AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries,
                                         AlterClientQuotasOptions options);

ClientQuotaAlteration:

package org.apache.kafka.common.quota;

public class ClientQuotaAlteration {
    public ClientQuotaAlteration(ClientQuotaEntity entity, Collection<Op> ops);

    public ClientQuotaEntity entity();
    public Collection<Op> ops();

    public static class Op {
        // Set value to update, null to clear
        public Op(String key, Double value);

        public String key();
        public Double value();
    }
}

AlterClientQuotasResult:

package org.apache.kafka.clients.admin;

public class AlterClientQuotasResult {
    Map<ClientQuotaEntity, KafkaFuture<Void>> values();
    KafkaFuture<Void> all();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.quota.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    // Set quotas for a specific user
    Map<String, String> entityEntries = new HashMap<>();
    entityEntries.put(ClientQuotaEntity.USER, "user1");
    ClientQuotaEntity entity = new ClientQuotaEntity(entityEntries);

    // Set producer and consumer byte rate quotas
    Collection<ClientQuotaAlteration.Op> ops = Arrays.asList(
        new ClientQuotaAlteration.Op("producer_byte_rate", 1024000.0), // 1MB/s
        new ClientQuotaAlteration.Op("consumer_byte_rate", 2048000.0)  // 2MB/s
    );

    ClientQuotaAlteration alteration = new ClientQuotaAlteration(entity, ops);

    AlterClientQuotasResult result = admin.alterClientQuotas(
        Collections.singletonList(alteration));

    result.all().get();
    System.out.println("Client quotas updated successfully");
}

SCRAM Credential Management

Describe User SCRAM Credentials

// Admin interface methods
DescribeUserScramCredentialsResult describeUserScramCredentials();
DescribeUserScramCredentialsResult describeUserScramCredentials(List<String> users);
DescribeUserScramCredentialsResult describeUserScramCredentials(List<String> users,
                                                               DescribeUserScramCredentialsOptions options);

DescribeUserScramCredentialsResult:

package org.apache.kafka.clients.admin;

public class DescribeUserScramCredentialsResult {
    // All user descriptions
    KafkaFuture<Map<String, UserScramCredentialsDescription>> all();

    // List of users with credentials
    KafkaFuture<List<String>> users();

    // Description for specific user
    KafkaFuture<UserScramCredentialsDescription> description(String userName);
}

UserScramCredentialsDescription:

package org.apache.kafka.clients.admin;

public class UserScramCredentialsDescription {
    public String name();
    public List<ScramCredentialInfo> credentialInfos();
}

ScramCredentialInfo:

package org.apache.kafka.clients.admin;

public class ScramCredentialInfo {
    public ScramCredentialInfo(ScramMechanism mechanism, int iterations);

    public ScramMechanism mechanism();
    public int iterations();
}

ScramMechanism:

package org.apache.kafka.clients.admin;

public enum ScramMechanism {
    SCRAM_SHA_256,
    SCRAM_SHA_512;

    public byte type();
    public String mechanismName();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    // Describe all users with SCRAM credentials
    DescribeUserScramCredentialsResult result =
        admin.describeUserScramCredentials();

    Map<String, UserScramCredentialsDescription> descriptions = result.all().get();
    for (Map.Entry<String, UserScramCredentialsDescription> entry : descriptions.entrySet()) {
        System.out.println("User: " + entry.getKey());
        for (ScramCredentialInfo info : entry.getValue().credentialInfos()) {
            System.out.println("  Mechanism: " + info.mechanism() +
                ", Iterations: " + info.iterations());
        }
    }
}

Alter User SCRAM Credentials

// Admin interface method
AlterUserScramCredentialsResult alterUserScramCredentials(
    List<UserScramCredentialAlteration> alterations);
AlterUserScramCredentialsResult alterUserScramCredentials(
    List<UserScramCredentialAlteration> alterations,
    AlterUserScramCredentialsOptions options);

UserScramCredentialUpsertion:

package org.apache.kafka.clients.admin;

public class UserScramCredentialUpsertion extends UserScramCredentialAlteration {
    // Constructor with auto-generated salt
    public UserScramCredentialUpsertion(String user,
                                       ScramCredentialInfo credentialInfo,
                                       String password);

    // Constructor with explicit salt
    public UserScramCredentialUpsertion(String user,
                                       ScramCredentialInfo credentialInfo,
                                       byte[] password,
                                       byte[] salt);

    public ScramCredentialInfo credentialInfo();
    public byte[] salt();
    public byte[] password();
}

UserScramCredentialDeletion:

package org.apache.kafka.clients.admin;

public class UserScramCredentialDeletion extends UserScramCredentialAlteration {
    public UserScramCredentialDeletion(String user, ScramMechanism mechanism);

    public ScramMechanism mechanism();
}

AlterUserScramCredentialsResult:

package org.apache.kafka.clients.admin;

public class AlterUserScramCredentialsResult {
    Map<String, KafkaFuture<Void>> values();
    KafkaFuture<Void> all();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    // Create or update SCRAM credentials for a user
    ScramCredentialInfo credentialInfo =
        new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096);

    UserScramCredentialUpsertion upsertion =
        new UserScramCredentialUpsertion("user1", credentialInfo, "password123");

    AlterUserScramCredentialsResult result =
        admin.alterUserScramCredentials(Collections.singletonList(upsertion));

    result.all().get();
    System.out.println("SCRAM credentials updated successfully");

    // Delete SCRAM credentials for a user
    UserScramCredentialDeletion deletion =
        new UserScramCredentialDeletion("user2", ScramMechanism.SCRAM_SHA_256);

    AlterUserScramCredentialsResult deleteResult =
        admin.alterUserScramCredentials(Collections.singletonList(deletion));

    deleteResult.all().get();
    System.out.println("SCRAM credentials deleted successfully");
}

Delegation Token Management

Create Delegation Token

// Admin interface method
CreateDelegationTokenResult createDelegationToken();
CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options);

CreateDelegationTokenOptions:

package org.apache.kafka.clients.admin;

public class CreateDelegationTokenOptions extends AbstractOptions<CreateDelegationTokenOptions> {
    public CreateDelegationTokenOptions maxLifeTimeMs(long maxLifeTimeMs);
    public CreateDelegationTokenOptions renewers(List<KafkaPrincipal> renewers);
}

CreateDelegationTokenResult:

package org.apache.kafka.clients.admin;

public class CreateDelegationTokenResult {
    KafkaFuture<DelegationToken> delegationToken();
}

DelegationToken:

package org.apache.kafka.common.security.token.delegation;

public class DelegationToken {
    public TokenInformation tokenInfo();
    public byte[] hmac();
    public String hmacAsBase64String();
}

TokenInformation:

package org.apache.kafka.common.security.token.delegation;

public class TokenInformation {
    public KafkaPrincipal owner();
    public List<KafkaPrincipal> renewers();
    public long issueTimestamp();
    public long maxTimestamp();
    public long expiryTimestamp();
    public String tokenId();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.security.token.delegation.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    // Create a delegation token
    CreateDelegationTokenOptions options = new CreateDelegationTokenOptions()
        .maxLifeTimeMs(86400000); // 1 day

    CreateDelegationTokenResult result = admin.createDelegationToken(options);

    DelegationToken token = result.delegationToken().get();
    System.out.println("Token ID: " + token.tokenInfo().tokenId());
    System.out.println("HMAC: " + token.hmacAsBase64String());
    System.out.println("Owner: " + token.tokenInfo().owner());
    System.out.println("Expiry: " + token.tokenInfo().expiryTimestamp());
}

Renew Delegation Token

// Admin interface method
RenewDelegationTokenResult renewDelegationToken(byte[] hmac);
RenewDelegationTokenResult renewDelegationToken(byte[] hmac,
                                               RenewDelegationTokenOptions options);

RenewDelegationTokenResult:

package org.apache.kafka.clients.admin;

public class RenewDelegationTokenResult {
    KafkaFuture<Long> expiryTimestamp();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    byte[] hmac = ...; // HMAC from token creation

    RenewDelegationTokenResult result = admin.renewDelegationToken(hmac);

    long newExpiryTime = result.expiryTimestamp().get();
    System.out.println("Token renewed. New expiry: " + newExpiryTime);
}

Expire Delegation Token

// Admin interface method
ExpireDelegationTokenResult expireDelegationToken(byte[] hmac);
ExpireDelegationTokenResult expireDelegationToken(byte[] hmac,
                                                 ExpireDelegationTokenOptions options);

ExpireDelegationTokenOptions:

package org.apache.kafka.clients.admin;

public class ExpireDelegationTokenOptions extends AbstractOptions<ExpireDelegationTokenOptions> {
    public ExpireDelegationTokenOptions expiryTimePeriodMs(long expiryTimePeriodMs);
}

ExpireDelegationTokenResult:

package org.apache.kafka.clients.admin;

public class ExpireDelegationTokenResult {
    KafkaFuture<Long> expiryTimestamp();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    byte[] hmac = ...; // HMAC from token creation

    // Expire immediately
    ExpireDelegationTokenOptions options = new ExpireDelegationTokenOptions()
        .expiryTimePeriodMs(0);

    ExpireDelegationTokenResult result = admin.expireDelegationToken(hmac, options);

    long expiryTime = result.expiryTimestamp().get();
    System.out.println("Token expired at: " + expiryTime);
}

Describe Delegation Token

// Admin interface method
DescribeDelegationTokenResult describeDelegationToken();
DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options);

DescribeDelegationTokenOptions:

package org.apache.kafka.clients.admin;

public class DescribeDelegationTokenOptions extends AbstractOptions<DescribeDelegationTokenOptions> {
    public DescribeDelegationTokenOptions owners(List<KafkaPrincipal> owners);
}

DescribeDelegationTokenResult:

package org.apache.kafka.clients.admin;

public class DescribeDelegationTokenResult {
    KafkaFuture<List<DelegationToken>> delegationTokens();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.security.token.delegation.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    DescribeDelegationTokenResult result = admin.describeDelegationToken();

    List<DelegationToken> tokens = result.delegationTokens().get();
    for (DelegationToken token : tokens) {
        TokenInformation info = token.tokenInfo();
        System.out.println("Token ID: " + info.tokenId());
        System.out.println("  Owner: " + info.owner());
        System.out.println("  Expiry: " + info.expiryTimestamp());
        System.out.println("  Renewers: " + info.renewers());
    }
}

Replica Management

Describe Replica Log Directories

// Admin interface method
DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas);
DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas,
                                                   DescribeReplicaLogDirsOptions options);

TopicPartitionReplica:

package org.apache.kafka.common;

public class TopicPartitionReplica {
    public TopicPartitionReplica(String topic, int partition, int brokerId);

    public String topic();
    public int partition();
    public int brokerId();
}

DescribeReplicaLogDirsResult:

package org.apache.kafka.clients.admin;

public class DescribeReplicaLogDirsResult {
    Map<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> values();
    KafkaFuture<Map<TopicPartitionReplica, ReplicaLogDirInfo>> all();
}

ReplicaLogDirInfo:

package org.apache.kafka.clients.admin;

public class ReplicaLogDirInfo {
    public String getCurrentReplicaLogDir();
    public long getCurrentReplicaOffsetLag();
    public String getFutureReplicaLogDir();
    public long getFutureReplicaOffsetLag();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartitionReplica;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    // Describe log directories for specific replicas
    TopicPartitionReplica replica1 = new TopicPartitionReplica("my-topic", 0, 1);
    TopicPartitionReplica replica2 = new TopicPartitionReplica("my-topic", 1, 1);

    Collection<TopicPartitionReplica> replicas = Arrays.asList(replica1, replica2);

    DescribeReplicaLogDirsResult result = admin.describeReplicaLogDirs(replicas);

    Map<TopicPartitionReplica, DescribeReplicaLogDirsResult.ReplicaLogDirInfo> logDirs =
        result.all().get();

    for (Map.Entry<TopicPartitionReplica, DescribeReplicaLogDirsResult.ReplicaLogDirInfo> entry :
         logDirs.entrySet()) {
        TopicPartitionReplica replica = entry.getKey();
        DescribeReplicaLogDirsResult.ReplicaLogDirInfo info = entry.getValue();

        System.out.println("Replica: " + replica.topic() + "-" + replica.partition() +
            " on broker " + replica.brokerId());
        System.out.println("  Current log dir: " + info.getCurrentReplicaLogDir());
        System.out.println("  Current offset lag: " + info.getCurrentReplicaOffsetLag());

        if (info.getFutureReplicaLogDir() != null) {
            System.out.println("  Future log dir: " + info.getFutureReplicaLogDir());
            System.out.println("  Future offset lag: " + info.getFutureReplicaOffsetLag());
        }
    }
}

Alter Replica Log Directories

// Admin interface method
AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment);
AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment,
                                             AlterReplicaLogDirsOptions options);

AlterReplicaLogDirsResult:

package org.apache.kafka.clients.admin;

public class AlterReplicaLogDirsResult {
    Map<TopicPartitionReplica, KafkaFuture<Void>> values();
    KafkaFuture<Void> all();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartitionReplica;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    // Move replicas to specific log directories
    Map<TopicPartitionReplica, String> replicaAssignment = new HashMap<>();

    TopicPartitionReplica replica1 = new TopicPartitionReplica("my-topic", 0, 1);
    replicaAssignment.put(replica1, "/data/kafka-logs-1");

    TopicPartitionReplica replica2 = new TopicPartitionReplica("my-topic", 1, 1);
    replicaAssignment.put(replica2, "/data/kafka-logs-2");

    AlterReplicaLogDirsResult result = admin.alterReplicaLogDirs(replicaAssignment);

    result.all().get();
    System.out.println("Replica log directories altered successfully");
}

Partition Reassignment

Alter Partition Reassignments

// Admin interface method
AlterPartitionReassignmentsResult alterPartitionReassignments(
    Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments);
AlterPartitionReassignmentsResult alterPartitionReassignments(
    Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments,
    AlterPartitionReassignmentsOptions options);

NewPartitionReassignment:

package org.apache.kafka.clients.admin;

public class NewPartitionReassignment {
    public NewPartitionReassignment(List<Integer> targetReplicas);

    public List<Integer> targetReplicas();
}

AlterPartitionReassignmentsResult:

package org.apache.kafka.clients.admin;

public class AlterPartitionReassignmentsResult {
    Map<TopicPartition, KafkaFuture<Void>> values();
    KafkaFuture<Void> all();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    // Reassign partition replicas to different brokers
    TopicPartition partition = new TopicPartition("my-topic", 0);

    // Move partition 0 to brokers 1, 2, and 3
    NewPartitionReassignment reassignment =
        new NewPartitionReassignment(Arrays.asList(1, 2, 3));

    Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments = new HashMap<>();
    reassignments.put(partition, Optional.of(reassignment));

    AlterPartitionReassignmentsResult result =
        admin.alterPartitionReassignments(reassignments);

    result.all().get();
    System.out.println("Partition reassignment initiated");

    // Cancel ongoing reassignment (pass empty Optional)
    Map<TopicPartition, Optional<NewPartitionReassignment>> cancelReassignments = new HashMap<>();
    cancelReassignments.put(partition, Optional.empty());

    AlterPartitionReassignmentsResult cancelResult =
        admin.alterPartitionReassignments(cancelReassignments);

    cancelResult.all().get();
    System.out.println("Partition reassignment cancelled");
}

List Partition Reassignments

// Admin interface method
ListPartitionReassignmentsResult listPartitionReassignments();
ListPartitionReassignmentsResult listPartitionReassignments(Set<TopicPartition> partitions);
ListPartitionReassignmentsResult listPartitionReassignments(Set<TopicPartition> partitions,
                                                           ListPartitionReassignmentsOptions options);

ListPartitionReassignmentsResult:

package org.apache.kafka.clients.admin;

public class ListPartitionReassignmentsResult {
    KafkaFuture<Map<TopicPartition, PartitionReassignment>> reassignments();
}

PartitionReassignment:

package org.apache.kafka.clients.admin;

public class PartitionReassignment {
    public List<Integer> replicas();
    public List<Integer> addingReplicas();
    public List<Integer> removingReplicas();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    // List all ongoing partition reassignments
    ListPartitionReassignmentsResult result = admin.listPartitionReassignments();

    Map<TopicPartition, PartitionReassignment> reassignments =
        result.reassignments().get();

    if (reassignments.isEmpty()) {
        System.out.println("No ongoing partition reassignments");
    } else {
        for (Map.Entry<TopicPartition, PartitionReassignment> entry : reassignments.entrySet()) {
            TopicPartition partition = entry.getKey();
            PartitionReassignment reassignment = entry.getValue();

            System.out.println("Partition: " + partition);
            System.out.println("  Current replicas: " + reassignment.replicas());
            System.out.println("  Adding replicas: " + reassignment.addingReplicas());
            System.out.println("  Removing replicas: " + reassignment.removingReplicas());
        }
    }
}

Log Directory Operations

Manage and inspect log directories on brokers. Use these operations to query disk usage and move replicas between log directories on the same broker.

Describe Log Directories

// Admin interface method
DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers);
DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options);

DescribeLogDirsResult:

package org.apache.kafka.clients.admin;

public class DescribeLogDirsResult {
    Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions();
    KafkaFuture<Map<Integer, Map<String, LogDirDescription>>> allDescriptions();
}

LogDirDescription:

package org.apache.kafka.clients.admin;

public class LogDirDescription {
    ApiException error();
    Map<TopicPartition, ReplicaInfo> replicaInfos();
    OptionalLong totalBytes();
    OptionalLong usableBytes();
}

ReplicaInfo:

package org.apache.kafka.clients.admin;

public class ReplicaInfo {
    long size();
    long offsetLag();
    boolean isFuture();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    // Query log directories for brokers 1 and 2
    DescribeLogDirsResult result = admin.describeLogDirs(Arrays.asList(1, 2));

    Map<Integer, Map<String, LogDirDescription>> allLogDirs = result.allDescriptions().get();

    for (Map.Entry<Integer, Map<String, LogDirDescription>> brokerEntry : allLogDirs.entrySet()) {
        int brokerId = brokerEntry.getKey();
        System.out.println("Broker " + brokerId + ":");

        for (Map.Entry<String, LogDirDescription> dirEntry : brokerEntry.getValue().entrySet()) {
            String path = dirEntry.getKey();
            LogDirDescription desc = dirEntry.getValue();

            System.out.println("  Log directory: " + path);
            if (desc.error() != null) {
                System.out.println("    Error: " + desc.error().getMessage());
            } else {
                System.out.println("    Total bytes: " + desc.totalBytes().orElse(-1));
                System.out.println("    Usable bytes: " + desc.usableBytes().orElse(-1));
                System.out.println("    Partitions: " + desc.replicaInfos().size());

                for (Map.Entry<TopicPartition, ReplicaInfo> replicaEntry :
                     desc.replicaInfos().entrySet()) {
                    TopicPartition tp = replicaEntry.getKey();
                    ReplicaInfo info = replicaEntry.getValue();
                    System.out.println("      " + tp + ": size=" + info.size() +
                        ", lag=" + info.offsetLag() + ", future=" + info.isFuture());
                }
            }
        }
    }
}

Leader Election

Trigger leader elections for topic partitions. Use this operation to rebalance leadership or recover from unclean leader elections.

Elect Leaders

// Admin interface method
ElectLeadersResult electLeaders(ElectionType electionType, Set<TopicPartition> partitions);
ElectLeadersResult electLeaders(ElectionType electionType, Set<TopicPartition> partitions,
                               ElectLeadersOptions options);

ElectionType:

package org.apache.kafka.common;

public enum ElectionType {
    PREFERRED,  // Elect the preferred leader (first replica in replica list)
    UNCLEAN     // Allow election of out-of-sync replica if no ISR available
}

ElectLeadersResult:

package org.apache.kafka.clients.admin;

public class ElectLeadersResult {
    KafkaFuture<Map<TopicPartition, Optional<Throwable>>> partitions();
    KafkaFuture<Void> all();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    // Trigger preferred leader election for specific partitions
    Set<TopicPartition> partitions = new HashSet<>();
    partitions.add(new TopicPartition("my-topic", 0));
    partitions.add(new TopicPartition("my-topic", 1));

    ElectLeadersResult result = admin.electLeaders(
        ElectionType.PREFERRED,
        partitions
    );

    Map<TopicPartition, Optional<Throwable>> results = result.partitions().get();
    for (Map.Entry<TopicPartition, Optional<Throwable>> entry : results.entrySet()) {
        if (entry.getValue().isPresent()) {
            System.err.println("Failed to elect leader for " + entry.getKey() +
                ": " + entry.getValue().get().getMessage());
        } else {
            System.out.println("Successfully elected leader for " + entry.getKey());
        }
    }

    // Trigger preferred leader election for all partitions (pass null)
    ElectLeadersResult allResult = admin.electLeaders(ElectionType.PREFERRED, null);
    allResult.all().get();
    System.out.println("Triggered preferred leader election for all partitions");

    // Unclean leader election (use with caution - may result in data loss)
    ElectLeadersResult uncleanResult = admin.electLeaders(
        ElectionType.UNCLEAN,
        Collections.singleton(new TopicPartition("critical-topic", 0))
    );
    uncleanResult.all().get();
}

Record Operations

Delete records from topic partitions. This operation sets the low watermark for a partition, making older records unavailable to consumers.

Delete Records

// Admin interface method
DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete);
DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete,
                                 DeleteRecordsOptions options);

RecordsToDelete:

package org.apache.kafka.clients.admin;

public class RecordsToDelete {
    static RecordsToDelete beforeOffset(long offset);
    long beforeOffset();
}

DeleteRecordsResult:

package org.apache.kafka.clients.admin;

public class DeleteRecordsResult {
    Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks();
    KafkaFuture<Void> all();
}

DeletedRecords:

package org.apache.kafka.clients.admin;

public class DeletedRecords {
    long lowWatermark();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    // Delete all records before offset 1000 on partition 0
    TopicPartition partition = new TopicPartition("my-topic", 0);

    Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
    recordsToDelete.put(partition, RecordsToDelete.beforeOffset(1000L));

    DeleteRecordsResult result = admin.deleteRecords(recordsToDelete);

    // Check the new low watermark for each partition
    Map<TopicPartition, KafkaFuture<DeletedRecords>> results = result.lowWatermarks();
    for (Map.Entry<TopicPartition, KafkaFuture<DeletedRecords>> entry : results.entrySet()) {
        DeletedRecords deleted = entry.getValue().get();
        System.out.println("New low watermark for " + entry.getKey() +
            ": " + deleted.lowWatermark());
    }

    // Or wait for all to complete
    result.all().get();
    System.out.println("Records deleted successfully");
}

Cluster Operations

Describe Cluster

// Admin interface method
DescribeClusterResult describeCluster();
DescribeClusterResult describeCluster(DescribeClusterOptions options);

DescribeClusterResult:

package org.apache.kafka.clients.admin;

public class DescribeClusterResult {
    KafkaFuture<Collection<Node>> nodes();
    KafkaFuture<Node> controller();
    KafkaFuture<String> clusterId();
    KafkaFuture<Set<AclOperation>> authorizedOperations();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.Node;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    DescribeClusterResult result = admin.describeCluster();

    String clusterId = result.clusterId().get();
    Node controller = result.controller().get();
    Collection<Node> nodes = result.nodes().get();

    System.out.println("Cluster ID: " + clusterId);
    System.out.println("Controller: " + controller.id() + " @ " +
        controller.host() + ":" + controller.port());
    System.out.println("Nodes: " + nodes.size());

    for (Node node : nodes) {
        System.out.println("  Node " + node.id() + ": " +
            node.host() + ":" + node.port());
    }
}

Offset Operations

List Offsets

// Admin interface method
ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets);
ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets,
                              ListOffsetsOptions options);

OffsetSpec:

package org.apache.kafka.clients.admin;

public class OffsetSpec {
    public static OffsetSpec latest();
    public static OffsetSpec earliest();
    public static OffsetSpec forTimestamp(long timestamp);
    public static OffsetSpec maxTimestamp();
    public static OffsetSpec earliestLocal();
    public static OffsetSpec latestTiered();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    TopicPartition partition = new TopicPartition("my-topic", 0);

    Map<TopicPartition, OffsetSpec> offsetSpecs = new HashMap<>();
    offsetSpecs.put(partition, OffsetSpec.latest());

    ListOffsetsResult result = admin.listOffsets(offsetSpecs);

    Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets =
        result.all().get();

    for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> entry :
         offsets.entrySet()) {
        System.out.println("Partition: " + entry.getKey() +
            ", Offset: " + entry.getValue().offset() +
            ", Timestamp: " + entry.getValue().timestamp());
    }
}

ACL Operations

Describe ACLs

// Admin interface method
DescribeAclsResult describeAcls(AclBindingFilter filter);
DescribeAclsResult describeAcls(AclBindingFilter filter, DescribeAclsOptions options);

Create ACLs

// Admin interface method
CreateAclsResult createAcls(Collection<AclBinding> acls);
CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options);

Delete ACLs

// Admin interface method
DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters);
DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters,
                            DeleteAclsOptions options);

Transaction Operations

Describe Transactions

// Admin interface method
DescribeTransactionsResult describeTransactions(Collection<String> transactionalIds);
DescribeTransactionsResult describeTransactions(Collection<String> transactionalIds,
                                               DescribeTransactionsOptions options);

TransactionDescription:

package org.apache.kafka.clients.admin;

public class TransactionDescription {
    public String transactionalId();
    public long producerId();
    public int producerEpoch();
    public TransactionState state();
    public Node coordinator();
    public Set<TopicPartition> topicPartitions();
}

TransactionState:

package org.apache.kafka.clients.admin;

public enum TransactionState {
    ONGOING,
    PREPARE_ABORT,
    PREPARE_COMMIT,
    COMPLETE_ABORT,
    COMPLETE_COMMIT,
    EMPTY,
    PREPARE_EPOCH_FENCE,
    UNKNOWN
}

List Transactions

// Admin interface method
ListTransactionsResult listTransactions();
ListTransactionsResult listTransactions(ListTransactionsOptions options);

Abort Transaction

// Admin interface method
AbortTransactionResult abortTransaction(AbortTransactionSpec spec);
AbortTransactionResult abortTransaction(AbortTransactionSpec spec,
                                       AbortTransactionOptions options);

AbortTransactionSpec:

package org.apache.kafka.clients.admin;

public class AbortTransactionSpec {
    public AbortTransactionSpec(TopicPartition topicPartition,
                               long producerId,
                               short producerEpoch,
                               int coordinatorEpoch);
}

Terminate Transaction

// Admin interface method
TerminateTransactionResult terminateTransaction(TerminateTransactionSpec spec);
TerminateTransactionResult terminateTransaction(TerminateTransactionSpec spec,
                                               TerminateTransactionOptions options);

TerminateTransactionSpec:

package org.apache.kafka.clients.admin;

public class TerminateTransactionSpec {
    public TerminateTransactionSpec(String transactionalId,
                                   long producerId,
                                   short producerEpoch,
                                   int coordinatorEpoch);
}

Usage Example:

import org.apache.kafka.clients.admin.*;

try (Admin admin = Admin.create(props)) {
    // Terminate a hanging transaction
    TerminateTransactionSpec spec = new TerminateTransactionSpec(
        "my-transactional-id",
        12345L,  // Producer ID
        (short) 0,  // Producer epoch
        0  // Coordinator epoch
    );

    TerminateTransactionResult result = admin.terminateTransaction(spec);
    result.all().get();
    System.out.println("Transaction terminated");
}

Share Group Operations (Kafka 4.x)

Manage share consumer groups, a new feature introduced in Kafka 4.0 that allows multiple consumers to share partition consumption with automatic record acknowledgment and retry handling.

Describe Share Groups

// Admin interface method
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds);
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds,
                                             DescribeShareGroupsOptions options);

Usage Example:

import org.apache.kafka.clients.admin.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    DescribeShareGroupsResult result = admin.describeShareGroups(
        Arrays.asList("share-group-1"));

    Map<String, ShareGroupDescription> descriptions = result.all().get();
    for (Map.Entry<String, ShareGroupDescription> entry : descriptions.entrySet()) {
        ShareGroupDescription desc = entry.getValue();
        System.out.println("Share Group: " + desc.groupId());
        System.out.println("  State: " + desc.state());
        System.out.println("  Members: " + desc.members().size());
    }
}

List Share Group Offsets

// Admin interface method
ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs);
ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs,
                                                 ListShareGroupOffsetsOptions options);

Usage Example:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    Map<String, ListShareGroupOffsetsSpec> specs = new HashMap<>();
    specs.put("share-group-1", new ListShareGroupOffsetsSpec());

    ListShareGroupOffsetsResult result = admin.listShareGroupOffsets(specs);

    Map<String, Map<TopicPartition, Long>> offsets = result.all().get();
    for (Map.Entry<String, Map<TopicPartition, Long>> groupEntry : offsets.entrySet()) {
        System.out.println("Group: " + groupEntry.getKey());
        for (Map.Entry<TopicPartition, Long> offsetEntry : groupEntry.getValue().entrySet()) {
            System.out.println("  " + offsetEntry.getKey() + ": " + offsetEntry.getValue());
        }
    }
}

Alter Share Group Offsets

// Admin interface method
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId,
                                                   Map<TopicPartition, Long> offsets);
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId,
                                                   Map<TopicPartition, Long> offsets,
                                                   AlterShareGroupOffsetsOptions options);

Usage Example:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    Map<TopicPartition, Long> newOffsets = new HashMap<>();
    newOffsets.put(new TopicPartition("my-topic", 0), 100L);
    newOffsets.put(new TopicPartition("my-topic", 1), 200L);

    AlterShareGroupOffsetsResult result =
        admin.alterShareGroupOffsets("share-group-1", newOffsets);

    result.all().get();
    System.out.println("Share group offsets updated");
}

Delete Share Groups

// Admin interface method
DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds);
DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds,
                                         DeleteShareGroupsOptions options);

Delete Share Group Offsets

// Admin interface method
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId,
                                                     Set<TopicPartition> partitions);
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId,
                                                     Set<TopicPartition> partitions,
                                                     DeleteShareGroupOffsetsOptions options);

Usage Example:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    // Delete offsets for specific partitions
    Set<TopicPartition> partitions = new HashSet<>();
    partitions.add(new TopicPartition("my-topic", 0));

    DeleteShareGroupOffsetsResult result =
        admin.deleteShareGroupOffsets("share-group-1", partitions);

    result.all().get();
    System.out.println("Share group offsets deleted");

    // Delete entire share group
    DeleteShareGroupsResult deleteResult =
        admin.deleteShareGroups(Collections.singletonList("share-group-1"));

    deleteResult.all().get();
    System.out.println("Share group deleted");
}

Streams Group Operations

Manage Kafka Streams application groups. These operations are similar to consumer group operations but specific to Kafka Streams applications.

Describe Streams Groups

// Admin interface method
DescribeStreamsGroupsResult describeStreamsGroups(Collection<String> groupIds);
DescribeStreamsGroupsResult describeStreamsGroups(Collection<String> groupIds,
                                                 DescribeStreamsGroupsOptions options);

Usage Example:

import org.apache.kafka.clients.admin.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    DescribeStreamsGroupsResult result = admin.describeStreamsGroups(
        Arrays.asList("streams-app-1"));

    Map<String, StreamsGroupDescription> descriptions = result.all().get();
    for (Map.Entry<String, StreamsGroupDescription> entry : descriptions.entrySet()) {
        StreamsGroupDescription desc = entry.getValue();
        System.out.println("Streams Group: " + desc.groupId());
        System.out.println("  State: " + desc.state());
        System.out.println("  Members: " + desc.members().size());
    }
}

List Streams Group Offsets

// Admin interface method
ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs);
ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs,
                                                     ListStreamsGroupOffsetsOptions options);

Usage Example:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    Map<String, ListStreamsGroupOffsetsSpec> specs = new HashMap<>();
    specs.put("streams-app-1", new ListStreamsGroupOffsetsSpec());

    ListStreamsGroupOffsetsResult result = admin.listStreamsGroupOffsets(specs);

    Map<String, Map<TopicPartition, OffsetAndMetadata>> offsets = result.all().get();
    for (Map.Entry<String, Map<TopicPartition, OffsetAndMetadata>> groupEntry :
         offsets.entrySet()) {
        System.out.println("Group: " + groupEntry.getKey());
        for (Map.Entry<TopicPartition, OffsetAndMetadata> offsetEntry :
             groupEntry.getValue().entrySet()) {
            System.out.println("  " + offsetEntry.getKey() +
                ": offset=" + offsetEntry.getValue().offset() +
                ", metadata=" + offsetEntry.getValue().metadata());
        }
    }
}

Alter Streams Group Offsets

// Admin interface method
AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId,
                                                       Map<TopicPartition, OffsetAndMetadata> offsets);
AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId,
                                                       Map<TopicPartition, OffsetAndMetadata> offsets,
                                                       AlterStreamsGroupOffsetsOptions options);

Usage Example:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    Map<TopicPartition, OffsetAndMetadata> newOffsets = new HashMap<>();
    newOffsets.put(new TopicPartition("input-topic", 0), new OffsetAndMetadata(100L));
    newOffsets.put(new TopicPartition("input-topic", 1), new OffsetAndMetadata(200L));

    AlterStreamsGroupOffsetsResult result =
        admin.alterStreamsGroupOffsets("streams-app-1", newOffsets);

    result.all().get();
    System.out.println("Streams group offsets updated");
}

Delete Streams Groups

// Admin interface method
DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds);
DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds,
                                             DeleteStreamsGroupsOptions options);

Delete Streams Group Offsets

// Admin interface method
DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId,
                                                         Set<TopicPartition> partitions);
DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId,
                                                         Set<TopicPartition> partitions,
                                                         DeleteStreamsGroupOffsetsOptions options);

Usage Example:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    // Delete offsets for specific partitions
    Set<TopicPartition> partitions = new HashSet<>();
    partitions.add(new TopicPartition("input-topic", 0));

    DeleteStreamsGroupOffsetsResult result =
        admin.deleteStreamsGroupOffsets("streams-app-1", partitions);

    result.all().get();
    System.out.println("Streams group offsets deleted");

    // Delete entire streams group
    DeleteStreamsGroupsResult deleteResult =
        admin.deleteStreamsGroups(Collections.singletonList("streams-app-1"));

    deleteResult.all().get();
    System.out.println("Streams group deleted");
}

Producer Operations

Describe Producers

// Admin interface method
DescribeProducersResult describeProducers(Collection<TopicPartition> partitions);
DescribeProducersResult describeProducers(Collection<TopicPartition> partitions,
                                         DescribeProducersOptions options);

Fence Producers

// Admin interface method
FenceProducersResult fenceProducers(Collection<String> transactionalIds);
FenceProducersResult fenceProducers(Collection<String> transactionalIds,
                                   FenceProducersOptions options);

Options Classes

All options classes extend AbstractOptions<T> with timeout configuration:

package org.apache.kafka.clients.admin;

public abstract class AbstractOptions<T extends AbstractOptions<T>> {
    public T timeoutMs(Integer timeoutMs);
    public Integer timeoutMs();
}

Common Options Classes:

  • CreateTopicsOptions
  • DeleteTopicsOptions
  • ListTopicsOptions
  • DescribeTopicsOptions
  • CreatePartitionsOptions
  • DescribeConfigsOptions
  • AlterConfigsOptions
  • DescribeConsumerGroupsOptions
  • ListConsumerGroupsOptions
  • ListConsumerGroupOffsetsOptions
  • AlterConsumerGroupOffsetsOptions
  • DeleteConsumerGroupsOptions
  • RemoveMembersFromConsumerGroupOptions
  • DescribeClusterOptions
  • ListOffsetsOptions
  • And 40+ more...

Usage Example:

import org.apache.kafka.clients.admin.*;
import java.time.Duration;

CreateTopicsOptions options = new CreateTopicsOptions()
    .timeoutMs(30000)
    .validateOnly(true); // Validate without creating

ListTopicsOptions listOptions = new ListTopicsOptions()
    .timeoutMs(10000)
    .listInternal(false);

Result Classes

All result classes provide KafkaFuture<T> methods for asynchronous operations:

package org.apache.kafka.clients.admin;

// Common pattern
public class SomeResult {
    // Future for all items
    KafkaFuture<Void> all();

    // Futures for individual items
    Map<String, KafkaFuture<ItemType>> values();
}

KafkaFuture:

package org.apache.kafka.common;

public interface KafkaFuture<T> {
    // Blocking methods
    T get() throws InterruptedException, ExecutionException;
    T get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
    T getNow(T valueIfAbsent) throws InterruptedException, ExecutionException;

    // Status methods
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    boolean isCompletedExceptionally();

    // Composition methods
    KafkaFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
    <R> KafkaFuture<R> thenApply(BaseFunction<T, R> function);

    // Static factory methods
    static <U> KafkaFuture<U> completedFuture(U value);
    static KafkaFuture<Void> allOf(KafkaFuture<?>... futures);

    // Conversion
    CompletionStage<T> toCompletionStage();
}

Static Factory Methods:

Create pre-completed futures or combine multiple futures:

// Create a completed future
KafkaFuture<String> completed = KafkaFuture.completedFuture("result");

// Wait for all futures to complete
KafkaFuture<Void> all = KafkaFuture.allOf(future1, future2, future3);

Future Composition Example:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;

try (Admin admin = Admin.create(props)) {
    // Chain operations with thenApply
    KafkaFuture<Integer> topicCountFuture = admin
        .listTopics()
        .names()
        .thenApply(names -> names.size());

    System.out.println("Total topics: " + topicCountFuture.get());

    // Use whenComplete for callbacks (doesn't transform the result)
    admin.createTopics(Collections.singletonList(new NewTopic("my-topic", 3, (short) 2)))
        .all()
        .whenComplete((v, throwable) -> {
            if (throwable != null) {
                System.err.println("Failed to create topic: " + throwable.getMessage());
            } else {
                System.out.println("Topic created successfully");
            }
        });

    // Convert to CompletionStage for advanced composition with CompletableFuture
    CompletionStage<Set<String>> stage = admin.listTopics().names().toCompletionStage();
    stage.thenAccept(names -> System.out.println("Topics: " + names));
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    CreateTopicsResult result = admin.createTopics(
        Collections.singletonList(new NewTopic("my-topic", 3, (short) 2)));

    // Blocking wait
    result.all().get();

    // Non-blocking callback
    result.all().whenComplete((v, throwable) -> {
        if (throwable != null) {
            System.err.println("Failed to create topic: " + throwable.getMessage());
        } else {
            System.out.println("Topic created successfully");
        }
    });
}

Feature Management

Manage and inspect feature flags in the Kafka cluster. Use these operations to upgrade or downgrade cluster features.

Describe Features

// Admin interface method
DescribeFeaturesResult describeFeatures();
DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);

DescribeFeaturesResult:

package org.apache.kafka.clients.admin;

public class DescribeFeaturesResult {
    KafkaFuture<FeatureMetadata> featureMetadata();
}

FeatureMetadata:

package org.apache.kafka.clients.admin;

public class FeatureMetadata {
    Map<String, FinalizedVersionRange> finalizedFeatures();
    Optional<Long> finalizedFeaturesEpoch();
    Map<String, SupportedVersionRange> supportedFeatures();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    DescribeFeaturesResult result = admin.describeFeatures();

    FeatureMetadata metadata = result.featureMetadata().get();

    System.out.println("Finalized Features:");
    for (Map.Entry<String, FinalizedVersionRange> entry :
         metadata.finalizedFeatures().entrySet()) {
        FinalizedVersionRange range = entry.getValue();
        System.out.println("  " + entry.getKey() +
            ": min=" + range.minVersionLevel() +
            ", max=" + range.maxVersionLevel());
    }

    System.out.println("\nSupported Features:");
    for (Map.Entry<String, SupportedVersionRange> entry :
         metadata.supportedFeatures().entrySet()) {
        SupportedVersionRange range = entry.getValue();
        System.out.println("  " + entry.getKey() +
            ": min=" + range.minVersion() +
            ", max=" + range.maxVersion());
    }
}

Update Features

// Admin interface method
UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates);
UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates,
                                   UpdateFeaturesOptions options);

FeatureUpdate:

package org.apache.kafka.clients.admin;

public class FeatureUpdate {
    public FeatureUpdate(short maxVersionLevel, UpgradeType upgradeType);

    short maxVersionLevel();
    UpgradeType upgradeType();

    enum UpgradeType {
        UNKNOWN,
        UPGRADE,           // Upgrade the feature level
        SAFE_DOWNGRADE,    // Downgrade without metadata loss
        UNSAFE_DOWNGRADE   // Downgrade allowing metadata loss
    }
}

UpdateFeaturesResult:

package org.apache.kafka.clients.admin;

public class UpdateFeaturesResult {
    Map<String, KafkaFuture<Void>> values();
    KafkaFuture<Void> all();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    // Upgrade a feature to version 2
    Map<String, FeatureUpdate> updates = new HashMap<>();
    updates.put("group.version", new FeatureUpdate(
        (short) 2,
        FeatureUpdate.UpgradeType.UPGRADE
    ));

    UpdateFeaturesResult result = admin.updateFeatures(updates);
    result.all().get();
    System.out.println("Feature updated successfully");

    // Safe downgrade a feature
    Map<String, FeatureUpdate> downgrades = new HashMap<>();
    downgrades.put("metadata.version", new FeatureUpdate(
        (short) 1,
        FeatureUpdate.UpgradeType.SAFE_DOWNGRADE
    ));

    UpdateFeaturesResult downgradeResult = admin.updateFeatures(downgrades);
    downgradeResult.all().get();
    System.out.println("Feature downgraded successfully");
}

Metadata Operations

Describe Metadata Quorum

Query information about the KRaft metadata quorum. This operation is only available when using KRaft mode (not ZooKeeper).

// Admin interface method
DescribeMetadataQuorumResult describeMetadataQuorum();
DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options);

DescribeMetadataQuorumResult:

package org.apache.kafka.clients.admin;

public class DescribeMetadataQuorumResult {
    KafkaFuture<QuorumInfo> quorumInfo();
}

Usage Example:

import org.apache.kafka.clients.admin.*;

try (Admin admin = Admin.create(props)) {
    DescribeMetadataQuorumResult result = admin.describeMetadataQuorum();

    QuorumInfo info = result.quorumInfo().get();

    System.out.println("Leader ID: " + info.leaderId());
    System.out.println("Leader Epoch: " + info.leaderEpoch());
    System.out.println("High Watermark: " + info.highWatermark());

    System.out.println("Voters:");
    for (ReplicaState voter : info.voters()) {
        System.out.println("  ID: " + voter.replicaId() +
            ", Offset: " + voter.logEndOffset());
    }

    System.out.println("Observers:");
    for (ReplicaState observer : info.observers()) {
        System.out.println("  ID: " + observer.replicaId() +
            ", Offset: " + observer.logEndOffset());
    }
}

Group Listing Operations

List Groups

List all consumer, share, and streams groups in the cluster. This operation replaces the deprecated listConsumerGroups() method and provides a unified view of all group types.

// Admin interface method
ListGroupsResult listGroups();
ListGroupsResult listGroups(ListGroupsOptions options);

ListGroupsResult:

package org.apache.kafka.clients.admin;

public class ListGroupsResult {
    KafkaFuture<Collection<GroupListing>> all();
    KafkaFuture<Collection<GroupListing>> valid();
    KafkaFuture<Collection<Throwable>> errors();
}

GroupListing:

package org.apache.kafka.clients.admin;

public class GroupListing {
    String groupId();
    Optional<GroupType> type();
    Optional<String> protocol();
    Optional<GroupState> state();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    ListGroupsResult result = admin.listGroups();

    Collection<GroupListing> groups = result.all().get();
    for (GroupListing group : groups) {
        System.out.println("Group: " + group.groupId());
        System.out.println("  Type: " + group.type().orElse(null));
        System.out.println("  State: " + group.state().orElse(null));
        System.out.println("  Protocol: " + group.protocol().orElse("unknown"));
    }

    // Only get valid groups (filter out errors)
    Collection<GroupListing> validGroups = result.valid().get();
    System.out.println("\nTotal valid groups: " + validGroups.size());
}

Describe Classic Groups

Describe legacy consumer groups (groups using the classic group protocol, not the new consumer group protocol).

// Admin interface method
DescribeClassicGroupsResult describeClassicGroups(Collection<String> groupIds);
DescribeClassicGroupsResult describeClassicGroups(Collection<String> groupIds,
                                                 DescribeClassicGroupsOptions options);

Usage Example:

import org.apache.kafka.clients.admin.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    DescribeClassicGroupsResult result = admin.describeClassicGroups(
        Arrays.asList("legacy-group-1"));

    Map<String, ClassicGroupDescription> descriptions = result.all().get();
    for (Map.Entry<String, ClassicGroupDescription> entry : descriptions.entrySet()) {
        ClassicGroupDescription desc = entry.getValue();
        System.out.println("Classic Group: " + desc.groupId());
        System.out.println("  Protocol Type: " + desc.protocolType());
        System.out.println("  State: " + desc.state());
        System.out.println("  Members: " + desc.members().size());

        for (MemberDescription member : desc.members()) {
            System.out.println("    Member: " + member.memberId());
            System.out.println("      Client ID: " + member.clientId());
            System.out.println("      Host: " + member.host());
        }
    }
}

Broker and KRaft Operations

Unregister Broker

Remove a broker from the cluster. This operation is useful for gracefully decommissioning brokers in KRaft mode.

// Admin interface method
UnregisterBrokerResult unregisterBroker(int brokerId);
UnregisterBrokerResult unregisterBroker(int brokerId, UnregisterBrokerOptions options);

Usage Example:

import org.apache.kafka.clients.admin.*;

try (Admin admin = Admin.create(props)) {
    UnregisterBrokerResult result = admin.unregisterBroker(3);

    result.all().get();
    System.out.println("Broker 3 unregistered successfully");
}

Add Raft Voter

Add a new voter to the KRaft metadata quorum. This operation is only available in KRaft mode.

// Admin interface method
AddRaftVoterResult addRaftVoter(int voterId, Uuid voterDirectoryId, Set<RaftVoterEndpoint> endpoints);
AddRaftVoterResult addRaftVoter(int voterId, Uuid voterDirectoryId, Set<RaftVoterEndpoint> endpoints,
                               AddRaftVoterOptions options);

RaftVoterEndpoint:

package org.apache.kafka.clients.admin;

public class RaftVoterEndpoint {
    public RaftVoterEndpoint(String name, String host, int port);

    String name();
    String host();
    int port();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.Uuid;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    Set<RaftVoterEndpoint> endpoints = new HashSet<>();
    endpoints.add(new RaftVoterEndpoint("controller", "broker4.example.com", 9093));

    AddRaftVoterResult result = admin.addRaftVoter(
        4,  // Voter ID
        Uuid.randomUuid(),  // Voter directory ID
        endpoints
    );

    result.all().get();
    System.out.println("Raft voter added successfully");
}

Remove Raft Voter

Remove a voter from the KRaft metadata quorum. This operation is only available in KRaft mode.

// Admin interface method
RemoveRaftVoterResult removeRaftVoter(int voterId);
RemoveRaftVoterResult removeRaftVoter(int voterId, RemoveRaftVoterOptions options);

Usage Example:

import org.apache.kafka.clients.admin.*;

try (Admin admin = Admin.create(props)) {
    RemoveRaftVoterResult result = admin.removeRaftVoter(4);

    result.all().get();
    System.out.println("Raft voter removed successfully");
}

Resource Listing Operations

List Client Metrics Resources

List all client metrics configurations in the cluster. This operation is useful for managing client-side metrics collection.

// Admin interface method
ListClientMetricsResourcesResult listClientMetricsResources();
ListClientMetricsResourcesResult listClientMetricsResources(ListClientMetricsResourcesOptions options);

ListClientMetricsResourcesResult:

package org.apache.kafka.clients.admin;

public class ListClientMetricsResourcesResult {
    KafkaFuture<Collection<ClientMetricsResourceListing>> all();
}

Usage Example:

import org.apache.kafka.clients.admin.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    ListClientMetricsResourcesResult result = admin.listClientMetricsResources();

    Collection<ClientMetricsResourceListing> resources = result.all().get();
    for (ClientMetricsResourceListing resource : resources) {
        System.out.println("Client Metrics Resource: " + resource.name());
    }
}

List Config Resources

List available configuration resource types in the cluster without retrieving their actual configuration values.

// Admin interface method
ListConfigResourcesResult listConfigResources();
ListConfigResourcesResult listConfigResources(Set<ConfigResource.Type> types);
ListConfigResourcesResult listConfigResources(Set<ConfigResource.Type> types,
                                             ListConfigResourcesOptions options);

Usage Example:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.ConfigResource;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    // List all topic config resources
    Set<ConfigResource.Type> types = Collections.singleton(ConfigResource.Type.TOPIC);
    ListConfigResourcesResult result = admin.listConfigResources(types);

    Collection<ConfigResourceListing> listings = result.all().get();
    for (ConfigResourceListing listing : listings) {
        System.out.println("Config Resource: " + listing.name() +
            " (Type: " + listing.type() + ")");
    }

    // List all config resource types
    ListConfigResourcesResult allResult = admin.listConfigResources();
    Collection<ConfigResourceListing> allListings = allResult.all().get();
    System.out.println("\nTotal config resources: " + allListings.size());
}

Best Practices

Resource Management

Always close the admin client:

try (Admin admin = Admin.create(props)) {
    // Use admin client
}
// Automatically closed

// Or manual close
Admin admin = Admin.create(props);
try {
    // Use admin client
} finally {
    admin.close();
}

Error Handling

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.errors.*;
import java.util.concurrent.ExecutionException;

try (Admin admin = Admin.create(props)) {
    CreateTopicsResult result = admin.createTopics(
        Collections.singletonList(new NewTopic("my-topic", 3, (short) 2)));

    result.all().get();
} catch (ExecutionException e) {
    Throwable cause = e.getCause();
    if (cause instanceof TopicExistsException) {
        System.out.println("Topic already exists");
    } else if (cause instanceof InvalidReplicationFactorException) {
        System.err.println("Invalid replication factor");
    } else if (cause instanceof InvalidPartitionsException) {
        System.err.println("Invalid partition count");
    } else {
        System.err.println("Failed to create topic: " + cause.getMessage());
    }
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
}

Batch Operations

Process multiple resources efficiently:

import org.apache.kafka.clients.admin.*;
import java.util.*;

try (Admin admin = Admin.create(props)) {
    // Create multiple topics in one call
    List<NewTopic> topics = Arrays.asList(
        new NewTopic("topic1", 3, (short) 2),
        new NewTopic("topic2", 5, (short) 2),
        new NewTopic("topic3", 1, (short) 2)
    );

    CreateTopicsResult result = admin.createTopics(topics);

    // Wait for all or handle individually
    for (Map.Entry<String, KafkaFuture<Void>> entry : result.values().entrySet()) {
        try {
            entry.getValue().get();
            System.out.println("Created topic: " + entry.getKey());
        } catch (Exception e) {
            System.err.println("Failed to create topic " + entry.getKey() +
                ": " + e.getMessage());
        }
    }
}

Monitoring and Metrics

import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import java.util.Map;

try (Admin admin = Admin.create(props)) {
    Map<MetricName, ? extends Metric> metrics = admin.metrics();

    for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
        MetricName name = entry.getKey();
        System.out.println(name.group() + "." + name.name() + ": " +
            entry.getValue().metricValue());
    }
}

Troubleshooting

Common Admin API Issues

Issue: TopicExistsException

Symptoms:

  • TopicExistsException when creating topics
  • Topic creation fails

Causes:

  • Topic already exists
  • Racing condition with multiple admin clients
  • Previous creation not fully completed

Solutions:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.errors.TopicExistsException;
import java.util.*;
import java.util.concurrent.ExecutionException;

public void createTopicIdempotent(Admin admin, String topicName, 
                                   int partitions, short replicationFactor) {
    NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
    
    try {
        admin.createTopics(Collections.singletonList(newTopic)).all().get();
        System.out.println("Topic created: " + topicName);
    } catch (ExecutionException e) {
        if (e.getCause() instanceof TopicExistsException) {
            System.out.println("Topic already exists: " + topicName);
            
            // Optionally verify configuration matches
            DescribeTopicsResult describeResult = 
                admin.describeTopics(Collections.singletonList(topicName));
            TopicDescription description = 
                describeResult.allTopicNames().get().get(topicName);
            
            if (description.partitions().size() != partitions) {
                System.err.println("WARNING: Topic exists but has " + 
                    description.partitions().size() + " partitions, expected " + partitions);
            }
        } else {
            throw new RuntimeException("Failed to create topic", e);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RuntimeException("Interrupted", e);
    }
}

Prevention:

  • Check topic existence before creation
  • Use idempotent creation logic
  • Handle TopicExistsException gracefully

Issue: TimeoutException on Admin Operations

Symptoms:

  • Admin operations timing out
  • Long-running operations fail
  • Intermittent failures

Causes:

  • Broker overload
  • Network latency
  • Insufficient timeout configuration
  • Large operations (many topics/partitions)

Solutions:

import org.apache.kafka.clients.admin.*;
import java.util.*;

// Increase default timeout
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000); // 60 seconds
props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 120000); // 2 minutes

try (Admin admin = Admin.create(props)) {
    // Or set timeout per operation
    CreateTopicsOptions options = new CreateTopicsOptions()
        .timeoutMs(120000); // 2 minutes
    
    CreateTopicsResult result = admin.createTopics(topics, options);
    result.all().get();
}

// For very long operations, monitor progress
CreateTopicsResult result = admin.createTopics(largeBatchOfTopics);

// Check individual topic results
Map<String, KafkaFuture<Void>> values = result.values();
for (Map.Entry<String, KafkaFuture<Void>> entry : values.entrySet()) {
    try {
        entry.getValue().get(60, TimeUnit.SECONDS);
        System.out.println("Created: " + entry.getKey());
    } catch (TimeoutException e) {
        System.err.println("Timeout creating: " + entry.getKey());
    } catch (ExecutionException e) {
        System.err.println("Error creating " + entry.getKey() + 
            ": " + e.getCause().getMessage());
    }
}

Prevention:

  • Set appropriate timeouts
  • Batch operations reasonably
  • Monitor broker performance
  • Use async operations with callbacks

Issue: UnknownTopicOrPartitionException

Symptoms:

  • UnknownTopicOrPartitionException on describe/delete
  • Operations fail for non-existent topics
  • Intermittent failures after topic creation

Causes:

  • Topic doesn't exist
  • Metadata not yet propagated
  • Topic recently deleted

Solutions:

public void waitForTopicCreation(Admin admin, String topicName, 
                                  int maxWaitMs) throws Exception {
    long startTime = System.currentTimeMillis();
    
    while (System.currentTimeMillis() - startTime < maxWaitMs) {
        try {
            DescribeTopicsResult result = 
                admin.describeTopics(Collections.singletonList(topicName));
            result.allTopicNames().get();
            System.out.println("Topic ready: " + topicName);
            return;
        } catch (ExecutionException e) {
            if (e.getCause() instanceof UnknownTopicOrPartitionException) {
                System.out.println("Waiting for topic metadata to propagate...");
                Thread.sleep(1000);
            } else {
                throw e;
            }
        }
    }
    
    throw new TimeoutException("Topic not ready after " + maxWaitMs + "ms");
}

// Usage
admin.createTopics(Collections.singletonList(newTopic)).all().get();
waitForTopicCreation(admin, topicName, 30000); // Wait up to 30s

Prevention:

  • Wait for metadata propagation after creation
  • List topics before describe/delete
  • Handle UnknownTopicOrPartitionException gracefully

Edge Cases

Concurrent Topic Operations

// Multiple admin clients operating simultaneously
// Can cause conflicts

// Safe: Check-then-create pattern with proper exception handling
try (Admin admin = Admin.create(props)) {
    // Check if topic exists
    ListTopicsResult listResult = admin.listTopics();
    Set<String> existingTopics = listResult.names().get();
    
    if (!existingTopics.contains("my-topic")) {
        try {
            admin.createTopics(
                Collections.singletonList(new NewTopic("my-topic", 3, (short) 2))
            ).all().get();
            System.out.println("Topic created");
        } catch (ExecutionException e) {
            if (e.getCause() instanceof TopicExistsException) {
                // Another client created it between check and create
                System.out.println("Topic created by another client");
            } else {
                throw e;
            }
        }
    }
}

Partial Batch Failures

// When creating multiple topics, some may succeed and others fail
try (Admin admin = Admin.create(props)) {
    List<NewTopic> topics = Arrays.asList(
        new NewTopic("topic1", 3, (short) 2),
        new NewTopic("existing-topic", 3, (short) 2), // Already exists
        new NewTopic("topic3", 3, (short) 2)
    );
    
    CreateTopicsResult result = admin.createTopics(topics);
    
    // Don't use result.all().get() - stops at first failure
    
    // Instead, check each individually
    Map<String, KafkaFuture<Void>> values = result.values();
    Set<String> successful = new HashSet<>();
    Set<String> failed = new HashSet<>();
    
    for (Map.Entry<String, KafkaFuture<Void>> entry : values.entrySet()) {
        try {
            entry.getValue().get();
            successful.add(entry.getKey());
        } catch (ExecutionException e) {
            failed.add(entry.getKey());
            System.err.println("Failed to create " + entry.getKey() + 
                ": " + e.getCause().getMessage());
        }
    }
    
    System.out.println("Created: " + successful);
    System.out.println("Failed: " + failed);
}

Consumer Group in Rebalancing State

// Describing consumer groups during rebalance
try (Admin admin = Admin.create(props)) {
    DescribeConsumerGroupsResult result = 
        admin.describeConsumerGroups(Collections.singletonList("my-group"));
    
    ConsumerGroupDescription description = 
        result.all().get().get("my-group");
    
    if (description.groupState() == ConsumerGroupState.REBALANCING) {
        System.out.println("Group is rebalancing - member info may be incomplete");
        
        // Wait and retry
        Thread.sleep(5000);
        DescribeConsumerGroupsResult retryResult = 
            admin.describeConsumerGroups(Collections.singletonList("my-group"));
        description = retryResult.all().get().get("my-group");
    }
    
    System.out.println("State: " + description.groupState());
    System.out.println("Members: " + description.members().size());
}

Deleting Active Consumer Group

// Cannot delete active consumer group
try (Admin admin = Admin.create(props)) {
    try {
        admin.deleteConsumerGroups(Collections.singletonList("my-group")).all().get();
        System.out.println("Group deleted");
    } catch (ExecutionException e) {
        if (e.getCause() instanceof GroupNotEmptyException) {
            System.out.println("Group has active members, cannot delete");
            
            // Option 1: Remove members first
            RemoveMembersFromConsumerGroupOptions removeOptions =
                new RemoveMembersFromConsumerGroupOptions();
            admin.removeMembersFromConsumerGroup("my-group", removeOptions).all().get();
            Thread.sleep(5000); // Wait for members to leave
            
            // Then delete
            admin.deleteConsumerGroups(Collections.singletonList("my-group")).all().get();
            
            // Option 2: Wait for consumers to close
            System.out.println("Close all consumers in the group first");
        } else {
            throw e;
        }
    }
}

Best Practices for Admin Operations

Checking Operation Feasibility

public class SafeAdminOperations {
    
    public void safeCreateTopic(Admin admin, String topicName, 
                                int partitions, short replicationFactor) throws Exception {
        // Check cluster has enough brokers for replication
        DescribeClusterResult clusterResult = admin.describeCluster();
        int brokerCount = clusterResult.nodes().get().size();
        
        if (replicationFactor > brokerCount) {
            throw new IllegalArgumentException(
                "Replication factor " + replicationFactor + 
                " exceeds broker count " + brokerCount);
        }
        
        // Check topic doesn't exist
        ListTopicsResult listResult = admin.listTopics();
        if (listResult.names().get().contains(topicName)) {
            System.out.println("Topic already exists: " + topicName);
            return;
        }
        
        // Create topic
        NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
        admin.createTopics(Collections.singletonList(newTopic)).all().get();
        System.out.println("Topic created: " + topicName);
    }
    
    public void safeDeleteTopic(Admin admin, String topicName) throws Exception {
        // Check topic exists
        ListTopicsResult listResult = admin.listTopics();
        if (!listResult.names().get().contains(topicName)) {
            System.out.println("Topic doesn't exist: " + topicName);
            return;
        }
        
        // Check for active consumers
        List<ConsumerGroupListing> groups = 
            new ArrayList<>(admin.listConsumerGroups().all().get());
        
        for (ConsumerGroupListing group : groups) {
            ListConsumerGroupOffsetsResult offsetResult = 
                admin.listConsumerGroupOffsets(group.groupId());
            Map<TopicPartition, OffsetAndMetadata> offsets = 
                offsetResult.partitionsToOffsetAndMetadata().get();
            
            boolean consumingFromTopic = offsets.keySet().stream()
                .anyMatch(tp -> tp.topic().equals(topicName));
            
            if (consumingFromTopic) {
                System.err.println("WARNING: Consumer group " + group.groupId() + 
                    " is actively consuming from topic " + topicName);
            }
        }
        
        // Delete topic
        admin.deleteTopics(Collections.singletonList(topicName)).all().get();
        System.out.println("Topic deleted: " + topicName);
    }
}

Rate Limiting Admin Operations

import org.apache.kafka.clients.admin.*;
import java.util.*;
import java.util.concurrent.Semaphore;

public class RateLimitedAdmin {
    private final Admin admin;
    private final Semaphore rateLimiter;
    
    public RateLimitedAdmin(Admin admin, int operationsPerSecond) {
        this.admin = admin;
        this.rateLimiter = new Semaphore(operationsPerSecond);
    }
    
    public void createTopicWithRateLimit(String topicName) throws Exception {
        rateLimiter.acquire();
        
        try {
            NewTopic newTopic = new NewTopic(topicName, 3, (short) 2);
            admin.createTopics(Collections.singletonList(newTopic)).all().get();
            System.out.println("Created topic: " + topicName);
        } finally {
            // Release permit after 1 second
            new Timer().schedule(new TimerTask() {
                @Override
                public void run() {
                    rateLimiter.release();
                }
            }, 1000);
        }
    }
}

Handling Async Operation Results

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import java.util.*;
import java.util.concurrent.CompletableFuture;

public class AsyncAdminOperations {
    
    public CompletableFuture<Void> createTopicsAsync(Admin admin, 
                                                      List<NewTopic> topics) {
        CreateTopicsResult result = admin.createTopics(topics);
        
        // Convert KafkaFuture to CompletableFuture
        CompletableFuture<Void> future = new CompletableFuture<>();
        
        result.all().whenComplete((v, throwable) -> {
            if (throwable != null) {
                future.completeExceptionally(throwable);
            } else {
                future.complete(null);
            }
        });
        
        return future;
    }
    
    public void chainOperations(Admin admin) {
        // Create topics, then describe them
        createTopicsAsync(admin, Arrays.asList(
            new NewTopic("topic1", 3, (short) 2),
            new NewTopic("topic2", 5, (short) 2)
        )).thenCompose(v -> {
            // After topics created, describe them
            DescribeTopicsResult describeResult = 
                admin.describeTopics(Arrays.asList("topic1", "topic2"));
            
            CompletableFuture<Map<String, TopicDescription>> describeFuture = 
                new CompletableFuture<>();
            
            describeResult.allTopicNames().whenComplete((descriptions, throwable) -> {
                if (throwable != null) {
                    describeFuture.completeExceptionally(throwable);
                } else {
                    describeFuture.complete(descriptions);
                }
            });
            
            return describeFuture;
        }).thenAccept(descriptions -> {
            // Process descriptions
            for (Map.Entry<String, TopicDescription> entry : descriptions.entrySet()) {
                System.out.println("Topic " + entry.getKey() + 
                    " has " + entry.getValue().partitions().size() + " partitions");
            }
        }).exceptionally(throwable -> {
            System.err.println("Operation failed: " + throwable.getMessage());
            return null;
        });
    }
}

Edge Cases

Empty Consumer Groups

// Consumer group with no active members
try (Admin admin = Admin.create(props)) {
    DescribeConsumerGroupsResult result = 
        admin.describeConsumerGroups(Collections.singletonList("empty-group"));
    
    ConsumerGroupDescription description = 
        result.all().get().get("empty-group");
    
    if (description.members().isEmpty()) {
        System.out.println("Group has no active members");
        
        // Can still have committed offsets
        ListConsumerGroupOffsetsResult offsetsResult = 
            admin.listConsumerGroupOffsets("empty-group");
        Map<TopicPartition, OffsetAndMetadata> offsets = 
            offsetsResult.partitionsToOffsetAndMetadata().get();
        
        if (!offsets.isEmpty()) {
            System.out.println("Group has committed offsets: " + offsets.size());
            // Can delete group or reset offsets
        }
    }
}

Partition Reassignment Monitoring

// Monitor partition reassignment progress
public void monitorReassignment(Admin admin, TopicPartition partition) throws Exception {
    while (true) {
        ListPartitionReassignmentsResult result = 
            admin.listPartitionReassignments(Collections.singleton(partition));
        
        Map<TopicPartition, PartitionReassignment> reassignments = 
            result.reassignments().get();
        
        if (reassignments.isEmpty()) {
            System.out.println("Reassignment complete for " + partition);
            break;
        }
        
        PartitionReassignment reassignment = reassignments.get(partition);
        System.out.println("Reassignment in progress:");
        System.out.println("  Current replicas: " + reassignment.replicas());
        System.out.println("  Adding replicas: " + reassignment.addingReplicas());
        System.out.println("  Removing replicas: " + reassignment.removingReplicas());
        
        // Check replica sync status
        DescribeTopicsResult describeResult = 
            admin.describeTopics(Collections.singletonList(partition.topic()));
        TopicDescription topicDesc = 
            describeResult.allTopicNames().get().get(partition.topic());
        
        TopicPartitionInfo partitionInfo = 
            topicDesc.partitions().get(partition.partition());
        
        System.out.println("  ISR: " + partitionInfo.isr().size() + "/" + 
            partitionInfo.replicas().size());
        
        Thread.sleep(5000); // Check every 5 seconds
    }
}

ACL Operations on Non-Existent Resources

// ACLs can be created for non-existent topics
// This is by design - allows pre-provisioning security

try (Admin admin = Admin.create(props)) {
    // Create ACL for topic that doesn't exist yet
    AclBinding aclBinding = new AclBinding(
        new ResourcePattern(ResourceType.TOPIC, "future-topic", PatternType.LITERAL),
        new AccessControlEntry("User:alice", "*", AclOperation.READ, 
            AclPermissionType.ALLOW)
    );
    
    admin.createAcls(Collections.singleton(aclBinding)).all().get();
    System.out.println("ACL created for non-existent topic");
    
    // Later, when topic is created, ACL is already in effect
    admin.createTopics(Collections.singletonList(
        new NewTopic("future-topic", 3, (short) 2)
    )).all().get();
    
    System.out.println("Topic created with pre-existing ACL");
}

Performance Optimization

Batch Operations Efficiently

// Create many topics efficiently
public void createManyTopics(Admin admin, int count) throws Exception {
    // Create in batches to avoid overwhelming broker
    int batchSize = 100;
    
    for (int i = 0; i < count; i += batchSize) {
        List<NewTopic> batch = new ArrayList<>();
        
        for (int j = i; j < Math.min(i + batchSize, count); j++) {
            batch.add(new NewTopic("topic-" + j, 3, (short) 2));
        }
        
        CreateTopicsResult result = admin.createTopics(batch);
        result.all().get(); // Wait for batch to complete
        
        System.out.println("Created batch " + (i / batchSize + 1) + 
            ": topics " + i + " to " + Math.min(i + batchSize - 1, count - 1));
        
        // Small delay between batches
        Thread.sleep(1000);
    }
}

Caching Admin Client

// Admin client is thread-safe and should be shared
public class AdminClientPool {
    private static Admin instance;
    
    public static synchronized Admin getInstance(Properties props) {
        if (instance == null) {
            instance = Admin.create(props);
            
            // Register shutdown hook
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                if (instance != null) {
                    instance.close(Duration.ofSeconds(30));
                }
            }));
        }
        return instance;
    }
}

// Usage across application
Admin admin = AdminClientPool.getInstance(props);
admin.createTopics(topics);
// Reuse same instance
admin.describeTopics(topicNames);