tessl install tessl/maven-org-apache-kafka--kafka-2-13@4.1.0Apache Kafka is a distributed event streaming platform that combines publish-subscribe messaging, durable storage, and real-time stream processing capabilities.
The Admin API manages Kafka resources programmatically. The Admin client is thread-safe and can be shared across multiple threads.
Main admin interface for managing Kafka resources.
package org.apache.kafka.clients.admin;
public interface Admin extends AutoCloseable {
// Factory methods
static Admin create(Properties props);
static Admin create(Map<String, Object> conf);
// Lifecycle
void close();
void close(Duration timeout);
// Metrics and identification
Map<MetricName, ? extends Metric> metrics();
void registerMetricForSubscription(KafkaMetric metric);
void unregisterMetricFromSubscription(KafkaMetric metric);
Uuid clientInstanceId(Duration timeout);
}Basic Usage:
import org.apache.kafka.clients.admin.*;
import java.util.Properties;
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (Admin admin = Admin.create(props)) {
// Use admin client
}// Admin interface method
CreateTopicsResult createTopics(Collection<NewTopic> newTopics);
CreateTopicsResult createTopics(Collection<NewTopic> newTopics,
CreateTopicsOptions options);NewTopic:
package org.apache.kafka.clients.admin;
public class NewTopic {
// Constructor with partitions and replication factor
public NewTopic(String name, int numPartitions, short replicationFactor);
// Constructor with replica assignments
public NewTopic(String name, Map<Integer, List<Integer>> replicasAssignments);
// Fluent configuration
public NewTopic configs(Map<String, String> configs);
}CreateTopicsResult:
package org.apache.kafka.clients.admin;
public class CreateTopicsResult {
// Future for all topics
KafkaFuture<Void> all();
// Future for individual topics
Map<String, KafkaFuture<Void>> values();
// Additional metadata
KafkaFuture<Config> config(String topic);
KafkaFuture<Uuid> topicId(String topic);
KafkaFuture<Integer> numPartitions(String topic);
KafkaFuture<Integer> replicationFactor(String topic);
}Usage Example:
import org.apache.kafka.clients.admin.*;
import java.util.*;
import java.util.concurrent.ExecutionException;
try (Admin admin = Admin.create(props)) {
// Create topic with partitions and replication factor
NewTopic newTopic = new NewTopic("my-topic", 3, (short) 2);
// Add topic configuration
Map<String, String> configs = new HashMap<>();
configs.put("retention.ms", "86400000"); // 1 day
configs.put("compression.type", "snappy");
newTopic.configs(configs);
CreateTopicsResult result = admin.createTopics(Collections.singletonList(newTopic));
// Wait for completion
result.all().get();
System.out.println("Topic created successfully");
} catch (ExecutionException e) {
if (e.getCause() instanceof TopicExistsException) {
System.out.println("Topic already exists");
}
}// Admin interface methods
DeleteTopicsResult deleteTopics(Collection<String> topics);
DeleteTopicsResult deleteTopics(TopicCollection topics);
DeleteTopicsResult deleteTopics(TopicCollection topics, DeleteTopicsOptions options);DeleteTopicsResult:
package org.apache.kafka.clients.admin;
public class DeleteTopicsResult {
// Future for all topics
KafkaFuture<Void> all();
// Futures by topic name
Map<String, KafkaFuture<Void>> topicNameValues();
// Futures by topic ID
Map<Uuid, KafkaFuture<Void>> topicIdValues();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
DeleteTopicsResult result = admin.deleteTopics(
Arrays.asList("topic1", "topic2"));
result.all().get();
System.out.println("Topics deleted successfully");
}// Admin interface methods
ListTopicsResult listTopics();
ListTopicsResult listTopics(ListTopicsOptions options);ListTopicsResult:
package org.apache.kafka.clients.admin;
public class ListTopicsResult {
KafkaFuture<Map<String, TopicListing>> namesToListings();
KafkaFuture<Collection<TopicListing>> listings();
KafkaFuture<Set<String>> names();
}TopicListing:
package org.apache.kafka.clients.admin;
public class TopicListing {
public String name();
public Uuid topicId();
public boolean isInternal();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
ListTopicsOptions options = new ListTopicsOptions()
.listInternal(false); // Exclude internal topics
ListTopicsResult result = admin.listTopics(options);
Set<String> topicNames = result.names().get();
System.out.println("Topics: " + topicNames);
// Or get full listings
Collection<TopicListing> listings = result.listings().get();
for (TopicListing listing : listings) {
System.out.println("Topic: " + listing.name() +
", Internal: " + listing.isInternal());
}
}// Admin interface methods
DescribeTopicsResult describeTopics(Collection<String> topicNames);
DescribeTopicsResult describeTopics(TopicCollection topics);
DescribeTopicsResult describeTopics(TopicCollection topics, DescribeTopicsOptions options);DescribeTopicsResult:
package org.apache.kafka.clients.admin;
public class DescribeTopicsResult {
Map<String, KafkaFuture<TopicDescription>> topicNameValues();
Map<Uuid, KafkaFuture<TopicDescription>> topicIdValues();
KafkaFuture<Map<String, TopicDescription>> allTopicNames();
KafkaFuture<Map<Uuid, TopicDescription>> allTopicIds();
}TopicDescription:
package org.apache.kafka.clients.admin;
public class TopicDescription {
public String name();
public boolean isInternal();
public List<TopicPartitionInfo> partitions();
public Set<AclOperation> authorizedOperations();
public Uuid topicId();
}TopicPartitionInfo:
Provides detailed information about a single partition within a topic. This class is distinct from PartitionInfo (used by consumers) in that it includes offline replica information.
package org.apache.kafka.clients.admin;
public class TopicPartitionInfo {
public int partition();
public Node leader();
public List<Node> replicas();
public List<Node> isr(); // In-sync replicas
public List<Node> offlineReplicas();
}Key Distinctions:
replicas(): All assigned replicas for this partitionisr(): In-sync replicas that are caught up with the leaderofflineReplicas(): Replicas that are currently offline (not available in consumer's PartitionInfo)Usage Example:
import org.apache.kafka.clients.admin.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
DescribeTopicsResult result = admin.describeTopics(
Arrays.asList("my-topic"));
Map<String, TopicDescription> descriptions = result.allTopicNames().get();
for (Map.Entry<String, TopicDescription> entry : descriptions.entrySet()) {
TopicDescription desc = entry.getValue();
System.out.println("Topic: " + desc.name());
System.out.println("Partitions: " + desc.partitions().size());
System.out.println("Internal: " + desc.isInternal());
for (TopicPartitionInfo partition : desc.partitions()) {
System.out.println(" Partition " + partition.partition() +
": leader=" + partition.leader().id() +
", replicas=" + partition.replicas().size() +
", isr=" + partition.isr().size());
}
}
}// Admin interface method
CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions);
CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions,
CreatePartitionsOptions options);NewPartitions:
package org.apache.kafka.clients.admin;
public class NewPartitions {
// Increase partition count
public static NewPartitions increaseTo(int totalCount);
// Increase with specific replica assignments
public static NewPartitions increaseTo(int totalCount,
List<List<Integer>> newAssignments);
}Usage Example:
import org.apache.kafka.clients.admin.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
Map<String, NewPartitions> newPartitions = new HashMap<>();
newPartitions.put("my-topic", NewPartitions.increaseTo(5));
CreatePartitionsResult result = admin.createPartitions(newPartitions);
result.all().get();
System.out.println("Partitions increased successfully");
}// Admin interface method
DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources);
DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources,
DescribeConfigsOptions options);ConfigResource:
package org.apache.kafka.clients.admin;
public class ConfigResource {
public enum Type {
BROKER,
TOPIC,
BROKER_LOGGER,
CLIENT_METRICS
}
public ConfigResource(Type type, String name);
public Type type();
public String name();
}Config:
package org.apache.kafka.clients.admin;
public class Config {
public Collection<ConfigEntry> entries();
public ConfigEntry get(String name);
}ConfigEntry:
package org.apache.kafka.clients.admin;
public class ConfigEntry {
public enum ConfigType {
UNKNOWN, BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS, PASSWORD
}
public enum ConfigSource {
UNKNOWN, DYNAMIC_TOPIC_CONFIG, DYNAMIC_BROKER_CONFIG,
DYNAMIC_DEFAULT_BROKER_CONFIG, STATIC_BROKER_CONFIG, DEFAULT_CONFIG
}
public String name();
public String value();
public ConfigSource source();
public boolean isSensitive();
public boolean isReadOnly();
public ConfigType type();
public String documentation();
public List<ConfigSynonym> synonyms();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "my-topic");
DescribeConfigsResult result = admin.describeConfigs(
Collections.singletonList(resource));
Map<ConfigResource, Config> configs = result.all().get();
Config config = configs.get(resource);
for (ConfigEntry entry : config.entries()) {
System.out.println(entry.name() + " = " + entry.value() +
" (source: " + entry.source() + ")");
}
}// Admin interface method
AlterConfigsResult incrementalAlterConfigs(
Map<ConfigResource, Collection<AlterConfigOp>> configs);
AlterConfigsResult incrementalAlterConfigs(
Map<ConfigResource, Collection<AlterConfigOp>> configs,
AlterConfigsOptions options);AlterConfigOp:
package org.apache.kafka.clients.admin;
public class AlterConfigOp {
public enum OpType {
SET, // Set value
DELETE, // Delete value
APPEND, // Append to list
SUBTRACT // Remove from list
}
public AlterConfigOp(ConfigEntry configEntry, OpType opType);
public ConfigEntry configEntry();
public OpType opType();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "my-topic");
// Change retention and compression
Collection<AlterConfigOp> ops = Arrays.asList(
new AlterConfigOp(new ConfigEntry("retention.ms", "172800000"),
AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("compression.type", "lz4"),
AlterConfigOp.OpType.SET)
);
Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
configs.put(resource, ops);
AlterConfigsResult result = admin.incrementalAlterConfigs(configs);
result.all().get();
System.out.println("Configuration updated successfully");
}// Admin interface method
DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds);
DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds,
DescribeConsumerGroupsOptions options);ConsumerGroupDescription:
package org.apache.kafka.clients.admin;
public class ConsumerGroupDescription {
public String groupId();
public boolean isSimpleConsumerGroup();
public Collection<MemberDescription> members();
public String partitionAssignor();
public ConsumerGroupType type();
public ConsumerGroupState groupState();
public Node coordinator();
public Set<AclOperation> authorizedOperations();
public int groupEpoch();
public int targetAssignmentEpoch();
}MemberDescription:
package org.apache.kafka.clients.admin;
public class MemberDescription {
public String memberId();
public Optional<String> groupInstanceId();
public String clientId();
public String host();
public MemberAssignment assignment();
public Optional<MemberAssignment> targetAssignment();
public int memberEpoch();
public boolean upgraded();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
DescribeConsumerGroupsResult result = admin.describeConsumerGroups(
Arrays.asList("my-group"));
Map<String, ConsumerGroupDescription> descriptions = result.all().get();
for (Map.Entry<String, ConsumerGroupDescription> entry : descriptions.entrySet()) {
ConsumerGroupDescription desc = entry.getValue();
System.out.println("Group: " + desc.groupId());
System.out.println("State: " + desc.groupState());
System.out.println("Coordinator: " + desc.coordinator());
System.out.println("Members: " + desc.members().size());
for (MemberDescription member : desc.members()) {
System.out.println(" Member: " + member.memberId() +
", client: " + member.clientId() +
", host: " + member.host());
}
}
}// Admin interface method
ListConsumerGroupsResult listConsumerGroups();
ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options);Usage Example:
import org.apache.kafka.clients.admin.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
ListConsumerGroupsResult result = admin.listConsumerGroups();
Collection<ConsumerGroupListing> groups = result.all().get();
for (ConsumerGroupListing group : groups) {
System.out.println("Group: " + group.groupId() +
", Type: " + group.type() +
", State: " + group.state());
}
}// Admin interface methods
ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId);
ListConsumerGroupOffsetsResult listConsumerGroupOffsets(
Map<String, ListConsumerGroupOffsetsSpec> groupSpecs);
ListConsumerGroupOffsetsResult listConsumerGroupOffsets(
Map<String, ListConsumerGroupOffsetsSpec> groupSpecs,
ListConsumerGroupOffsetsOptions options);ListConsumerGroupOffsetsSpec:
package org.apache.kafka.clients.admin;
public class ListConsumerGroupOffsetsSpec {
// Factory method
public static ListConsumerGroupOffsetsSpec topicPartitions(
Collection<TopicPartition> topicPartitions);
}Usage Example:
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
try (Admin admin = Admin.create(props)) {
ListConsumerGroupOffsetsResult result =
admin.listConsumerGroupOffsets("my-group");
Map<TopicPartition, OffsetAndMetadata> offsets =
result.partitionsToOffsetAndMetadata().get();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
System.out.println("Partition: " + entry.getKey() +
", Offset: " + entry.getValue().offset() +
", Metadata: " + entry.getValue().metadata());
}
}// Admin interface method
AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(
String groupId,
Map<TopicPartition, OffsetAndMetadata> offsets);
AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(
String groupId,
Map<TopicPartition, OffsetAndMetadata> offsets,
AlterConsumerGroupOffsetsOptions options);Usage Example:
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
try (Admin admin = Admin.create(props)) {
// Reset offsets to specific values
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("my-topic", 0), new OffsetAndMetadata(100L));
offsets.put(new TopicPartition("my-topic", 1), new OffsetAndMetadata(200L));
AlterConsumerGroupOffsetsResult result =
admin.alterConsumerGroupOffsets("my-group", offsets);
result.all().get();
System.out.println("Offsets reset successfully");
}// Admin interface method
DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds);
DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds,
DeleteConsumerGroupsOptions options);// Admin interface method
RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(
String groupId,
RemoveMembersFromConsumerGroupOptions options);Usage Example:
import org.apache.kafka.clients.admin.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
RemoveMembersFromConsumerGroupOptions options =
new RemoveMembersFromConsumerGroupOptions();
// Remove all members (force rebalance)
RemoveMembersFromConsumerGroupResult result =
admin.removeMembersFromConsumerGroup("my-group", options);
result.all().get();
System.out.println("Members removed successfully");
}// Admin interface method
DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter);
DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter,
DescribeClientQuotasOptions options);ClientQuotaFilter:
package org.apache.kafka.common.quota;
public class ClientQuotaFilter {
// Match entities with specified components (inclusive)
public static ClientQuotaFilter contains(Collection<ClientQuotaFilterComponent> components);
// Match only entities with exactly these components (strict)
public static ClientQuotaFilter containsOnly(Collection<ClientQuotaFilterComponent> components);
// Match all configured entities
public static ClientQuotaFilter all();
public Collection<ClientQuotaFilterComponent> components();
public boolean strict();
}ClientQuotaFilterComponent:
package org.apache.kafka.common.quota;
public class ClientQuotaFilterComponent {
// Match exact entity name
public static ClientQuotaFilterComponent ofEntity(String entityType, String entityName);
// Match default entity
public static ClientQuotaFilterComponent ofDefaultEntity(String entityType);
// Match any entity of this type
public static ClientQuotaFilterComponent ofEntityType(String entityType);
public String entityType();
public Optional<String> match();
}ClientQuotaEntity:
package org.apache.kafka.common.quota;
public class ClientQuotaEntity {
public static final String USER = "user";
public static final String CLIENT_ID = "client-id";
public static final String IP = "ip";
public ClientQuotaEntity(Map<String, String> entries);
public Map<String, String> entries();
}DescribeClientQuotasResult:
package org.apache.kafka.clients.admin;
public class DescribeClientQuotasResult {
// Returns map of entity to quota values
KafkaFuture<Map<ClientQuotaEntity, Map<String, Double>>> entities();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.quota.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
// Describe quotas for a specific user
ClientQuotaFilterComponent userComponent =
ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER, "user1");
ClientQuotaFilter filter = ClientQuotaFilter.contains(
Collections.singletonList(userComponent));
DescribeClientQuotasResult result = admin.describeClientQuotas(filter);
Map<ClientQuotaEntity, Map<String, Double>> quotas = result.entities().get();
for (Map.Entry<ClientQuotaEntity, Map<String, Double>> entry : quotas.entrySet()) {
System.out.println("Entity: " + entry.getKey().entries());
for (Map.Entry<String, Double> quota : entry.getValue().entrySet()) {
System.out.println(" " + quota.getKey() + ": " + quota.getValue());
}
}
}// Admin interface method
AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries);
AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries,
AlterClientQuotasOptions options);ClientQuotaAlteration:
package org.apache.kafka.common.quota;
public class ClientQuotaAlteration {
public ClientQuotaAlteration(ClientQuotaEntity entity, Collection<Op> ops);
public ClientQuotaEntity entity();
public Collection<Op> ops();
public static class Op {
// Set value to update, null to clear
public Op(String key, Double value);
public String key();
public Double value();
}
}AlterClientQuotasResult:
package org.apache.kafka.clients.admin;
public class AlterClientQuotasResult {
Map<ClientQuotaEntity, KafkaFuture<Void>> values();
KafkaFuture<Void> all();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.quota.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
// Set quotas for a specific user
Map<String, String> entityEntries = new HashMap<>();
entityEntries.put(ClientQuotaEntity.USER, "user1");
ClientQuotaEntity entity = new ClientQuotaEntity(entityEntries);
// Set producer and consumer byte rate quotas
Collection<ClientQuotaAlteration.Op> ops = Arrays.asList(
new ClientQuotaAlteration.Op("producer_byte_rate", 1024000.0), // 1MB/s
new ClientQuotaAlteration.Op("consumer_byte_rate", 2048000.0) // 2MB/s
);
ClientQuotaAlteration alteration = new ClientQuotaAlteration(entity, ops);
AlterClientQuotasResult result = admin.alterClientQuotas(
Collections.singletonList(alteration));
result.all().get();
System.out.println("Client quotas updated successfully");
}// Admin interface methods
DescribeUserScramCredentialsResult describeUserScramCredentials();
DescribeUserScramCredentialsResult describeUserScramCredentials(List<String> users);
DescribeUserScramCredentialsResult describeUserScramCredentials(List<String> users,
DescribeUserScramCredentialsOptions options);DescribeUserScramCredentialsResult:
package org.apache.kafka.clients.admin;
public class DescribeUserScramCredentialsResult {
// All user descriptions
KafkaFuture<Map<String, UserScramCredentialsDescription>> all();
// List of users with credentials
KafkaFuture<List<String>> users();
// Description for specific user
KafkaFuture<UserScramCredentialsDescription> description(String userName);
}UserScramCredentialsDescription:
package org.apache.kafka.clients.admin;
public class UserScramCredentialsDescription {
public String name();
public List<ScramCredentialInfo> credentialInfos();
}ScramCredentialInfo:
package org.apache.kafka.clients.admin;
public class ScramCredentialInfo {
public ScramCredentialInfo(ScramMechanism mechanism, int iterations);
public ScramMechanism mechanism();
public int iterations();
}ScramMechanism:
package org.apache.kafka.clients.admin;
public enum ScramMechanism {
SCRAM_SHA_256,
SCRAM_SHA_512;
public byte type();
public String mechanismName();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
// Describe all users with SCRAM credentials
DescribeUserScramCredentialsResult result =
admin.describeUserScramCredentials();
Map<String, UserScramCredentialsDescription> descriptions = result.all().get();
for (Map.Entry<String, UserScramCredentialsDescription> entry : descriptions.entrySet()) {
System.out.println("User: " + entry.getKey());
for (ScramCredentialInfo info : entry.getValue().credentialInfos()) {
System.out.println(" Mechanism: " + info.mechanism() +
", Iterations: " + info.iterations());
}
}
}// Admin interface method
AlterUserScramCredentialsResult alterUserScramCredentials(
List<UserScramCredentialAlteration> alterations);
AlterUserScramCredentialsResult alterUserScramCredentials(
List<UserScramCredentialAlteration> alterations,
AlterUserScramCredentialsOptions options);UserScramCredentialUpsertion:
package org.apache.kafka.clients.admin;
public class UserScramCredentialUpsertion extends UserScramCredentialAlteration {
// Constructor with auto-generated salt
public UserScramCredentialUpsertion(String user,
ScramCredentialInfo credentialInfo,
String password);
// Constructor with explicit salt
public UserScramCredentialUpsertion(String user,
ScramCredentialInfo credentialInfo,
byte[] password,
byte[] salt);
public ScramCredentialInfo credentialInfo();
public byte[] salt();
public byte[] password();
}UserScramCredentialDeletion:
package org.apache.kafka.clients.admin;
public class UserScramCredentialDeletion extends UserScramCredentialAlteration {
public UserScramCredentialDeletion(String user, ScramMechanism mechanism);
public ScramMechanism mechanism();
}AlterUserScramCredentialsResult:
package org.apache.kafka.clients.admin;
public class AlterUserScramCredentialsResult {
Map<String, KafkaFuture<Void>> values();
KafkaFuture<Void> all();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
// Create or update SCRAM credentials for a user
ScramCredentialInfo credentialInfo =
new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096);
UserScramCredentialUpsertion upsertion =
new UserScramCredentialUpsertion("user1", credentialInfo, "password123");
AlterUserScramCredentialsResult result =
admin.alterUserScramCredentials(Collections.singletonList(upsertion));
result.all().get();
System.out.println("SCRAM credentials updated successfully");
// Delete SCRAM credentials for a user
UserScramCredentialDeletion deletion =
new UserScramCredentialDeletion("user2", ScramMechanism.SCRAM_SHA_256);
AlterUserScramCredentialsResult deleteResult =
admin.alterUserScramCredentials(Collections.singletonList(deletion));
deleteResult.all().get();
System.out.println("SCRAM credentials deleted successfully");
}// Admin interface method
CreateDelegationTokenResult createDelegationToken();
CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options);CreateDelegationTokenOptions:
package org.apache.kafka.clients.admin;
public class CreateDelegationTokenOptions extends AbstractOptions<CreateDelegationTokenOptions> {
public CreateDelegationTokenOptions maxLifeTimeMs(long maxLifeTimeMs);
public CreateDelegationTokenOptions renewers(List<KafkaPrincipal> renewers);
}CreateDelegationTokenResult:
package org.apache.kafka.clients.admin;
public class CreateDelegationTokenResult {
KafkaFuture<DelegationToken> delegationToken();
}DelegationToken:
package org.apache.kafka.common.security.token.delegation;
public class DelegationToken {
public TokenInformation tokenInfo();
public byte[] hmac();
public String hmacAsBase64String();
}TokenInformation:
package org.apache.kafka.common.security.token.delegation;
public class TokenInformation {
public KafkaPrincipal owner();
public List<KafkaPrincipal> renewers();
public long issueTimestamp();
public long maxTimestamp();
public long expiryTimestamp();
public String tokenId();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.security.token.delegation.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
// Create a delegation token
CreateDelegationTokenOptions options = new CreateDelegationTokenOptions()
.maxLifeTimeMs(86400000); // 1 day
CreateDelegationTokenResult result = admin.createDelegationToken(options);
DelegationToken token = result.delegationToken().get();
System.out.println("Token ID: " + token.tokenInfo().tokenId());
System.out.println("HMAC: " + token.hmacAsBase64String());
System.out.println("Owner: " + token.tokenInfo().owner());
System.out.println("Expiry: " + token.tokenInfo().expiryTimestamp());
}// Admin interface method
RenewDelegationTokenResult renewDelegationToken(byte[] hmac);
RenewDelegationTokenResult renewDelegationToken(byte[] hmac,
RenewDelegationTokenOptions options);RenewDelegationTokenResult:
package org.apache.kafka.clients.admin;
public class RenewDelegationTokenResult {
KafkaFuture<Long> expiryTimestamp();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
byte[] hmac = ...; // HMAC from token creation
RenewDelegationTokenResult result = admin.renewDelegationToken(hmac);
long newExpiryTime = result.expiryTimestamp().get();
System.out.println("Token renewed. New expiry: " + newExpiryTime);
}// Admin interface method
ExpireDelegationTokenResult expireDelegationToken(byte[] hmac);
ExpireDelegationTokenResult expireDelegationToken(byte[] hmac,
ExpireDelegationTokenOptions options);ExpireDelegationTokenOptions:
package org.apache.kafka.clients.admin;
public class ExpireDelegationTokenOptions extends AbstractOptions<ExpireDelegationTokenOptions> {
public ExpireDelegationTokenOptions expiryTimePeriodMs(long expiryTimePeriodMs);
}ExpireDelegationTokenResult:
package org.apache.kafka.clients.admin;
public class ExpireDelegationTokenResult {
KafkaFuture<Long> expiryTimestamp();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
byte[] hmac = ...; // HMAC from token creation
// Expire immediately
ExpireDelegationTokenOptions options = new ExpireDelegationTokenOptions()
.expiryTimePeriodMs(0);
ExpireDelegationTokenResult result = admin.expireDelegationToken(hmac, options);
long expiryTime = result.expiryTimestamp().get();
System.out.println("Token expired at: " + expiryTime);
}// Admin interface method
DescribeDelegationTokenResult describeDelegationToken();
DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options);DescribeDelegationTokenOptions:
package org.apache.kafka.clients.admin;
public class DescribeDelegationTokenOptions extends AbstractOptions<DescribeDelegationTokenOptions> {
public DescribeDelegationTokenOptions owners(List<KafkaPrincipal> owners);
}DescribeDelegationTokenResult:
package org.apache.kafka.clients.admin;
public class DescribeDelegationTokenResult {
KafkaFuture<List<DelegationToken>> delegationTokens();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.security.token.delegation.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
DescribeDelegationTokenResult result = admin.describeDelegationToken();
List<DelegationToken> tokens = result.delegationTokens().get();
for (DelegationToken token : tokens) {
TokenInformation info = token.tokenInfo();
System.out.println("Token ID: " + info.tokenId());
System.out.println(" Owner: " + info.owner());
System.out.println(" Expiry: " + info.expiryTimestamp());
System.out.println(" Renewers: " + info.renewers());
}
}// Admin interface method
DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas);
DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas,
DescribeReplicaLogDirsOptions options);TopicPartitionReplica:
package org.apache.kafka.common;
public class TopicPartitionReplica {
public TopicPartitionReplica(String topic, int partition, int brokerId);
public String topic();
public int partition();
public int brokerId();
}DescribeReplicaLogDirsResult:
package org.apache.kafka.clients.admin;
public class DescribeReplicaLogDirsResult {
Map<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> values();
KafkaFuture<Map<TopicPartitionReplica, ReplicaLogDirInfo>> all();
}ReplicaLogDirInfo:
package org.apache.kafka.clients.admin;
public class ReplicaLogDirInfo {
public String getCurrentReplicaLogDir();
public long getCurrentReplicaOffsetLag();
public String getFutureReplicaLogDir();
public long getFutureReplicaOffsetLag();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartitionReplica;
import java.util.*;
try (Admin admin = Admin.create(props)) {
// Describe log directories for specific replicas
TopicPartitionReplica replica1 = new TopicPartitionReplica("my-topic", 0, 1);
TopicPartitionReplica replica2 = new TopicPartitionReplica("my-topic", 1, 1);
Collection<TopicPartitionReplica> replicas = Arrays.asList(replica1, replica2);
DescribeReplicaLogDirsResult result = admin.describeReplicaLogDirs(replicas);
Map<TopicPartitionReplica, DescribeReplicaLogDirsResult.ReplicaLogDirInfo> logDirs =
result.all().get();
for (Map.Entry<TopicPartitionReplica, DescribeReplicaLogDirsResult.ReplicaLogDirInfo> entry :
logDirs.entrySet()) {
TopicPartitionReplica replica = entry.getKey();
DescribeReplicaLogDirsResult.ReplicaLogDirInfo info = entry.getValue();
System.out.println("Replica: " + replica.topic() + "-" + replica.partition() +
" on broker " + replica.brokerId());
System.out.println(" Current log dir: " + info.getCurrentReplicaLogDir());
System.out.println(" Current offset lag: " + info.getCurrentReplicaOffsetLag());
if (info.getFutureReplicaLogDir() != null) {
System.out.println(" Future log dir: " + info.getFutureReplicaLogDir());
System.out.println(" Future offset lag: " + info.getFutureReplicaOffsetLag());
}
}
}// Admin interface method
AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment);
AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment,
AlterReplicaLogDirsOptions options);AlterReplicaLogDirsResult:
package org.apache.kafka.clients.admin;
public class AlterReplicaLogDirsResult {
Map<TopicPartitionReplica, KafkaFuture<Void>> values();
KafkaFuture<Void> all();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartitionReplica;
import java.util.*;
try (Admin admin = Admin.create(props)) {
// Move replicas to specific log directories
Map<TopicPartitionReplica, String> replicaAssignment = new HashMap<>();
TopicPartitionReplica replica1 = new TopicPartitionReplica("my-topic", 0, 1);
replicaAssignment.put(replica1, "/data/kafka-logs-1");
TopicPartitionReplica replica2 = new TopicPartitionReplica("my-topic", 1, 1);
replicaAssignment.put(replica2, "/data/kafka-logs-2");
AlterReplicaLogDirsResult result = admin.alterReplicaLogDirs(replicaAssignment);
result.all().get();
System.out.println("Replica log directories altered successfully");
}// Admin interface method
AlterPartitionReassignmentsResult alterPartitionReassignments(
Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments);
AlterPartitionReassignmentsResult alterPartitionReassignments(
Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments,
AlterPartitionReassignmentsOptions options);NewPartitionReassignment:
package org.apache.kafka.clients.admin;
public class NewPartitionReassignment {
public NewPartitionReassignment(List<Integer> targetReplicas);
public List<Integer> targetReplicas();
}AlterPartitionReassignmentsResult:
package org.apache.kafka.clients.admin;
public class AlterPartitionReassignmentsResult {
Map<TopicPartition, KafkaFuture<Void>> values();
KafkaFuture<Void> all();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
try (Admin admin = Admin.create(props)) {
// Reassign partition replicas to different brokers
TopicPartition partition = new TopicPartition("my-topic", 0);
// Move partition 0 to brokers 1, 2, and 3
NewPartitionReassignment reassignment =
new NewPartitionReassignment(Arrays.asList(1, 2, 3));
Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments = new HashMap<>();
reassignments.put(partition, Optional.of(reassignment));
AlterPartitionReassignmentsResult result =
admin.alterPartitionReassignments(reassignments);
result.all().get();
System.out.println("Partition reassignment initiated");
// Cancel ongoing reassignment (pass empty Optional)
Map<TopicPartition, Optional<NewPartitionReassignment>> cancelReassignments = new HashMap<>();
cancelReassignments.put(partition, Optional.empty());
AlterPartitionReassignmentsResult cancelResult =
admin.alterPartitionReassignments(cancelReassignments);
cancelResult.all().get();
System.out.println("Partition reassignment cancelled");
}// Admin interface method
ListPartitionReassignmentsResult listPartitionReassignments();
ListPartitionReassignmentsResult listPartitionReassignments(Set<TopicPartition> partitions);
ListPartitionReassignmentsResult listPartitionReassignments(Set<TopicPartition> partitions,
ListPartitionReassignmentsOptions options);ListPartitionReassignmentsResult:
package org.apache.kafka.clients.admin;
public class ListPartitionReassignmentsResult {
KafkaFuture<Map<TopicPartition, PartitionReassignment>> reassignments();
}PartitionReassignment:
package org.apache.kafka.clients.admin;
public class PartitionReassignment {
public List<Integer> replicas();
public List<Integer> addingReplicas();
public List<Integer> removingReplicas();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
try (Admin admin = Admin.create(props)) {
// List all ongoing partition reassignments
ListPartitionReassignmentsResult result = admin.listPartitionReassignments();
Map<TopicPartition, PartitionReassignment> reassignments =
result.reassignments().get();
if (reassignments.isEmpty()) {
System.out.println("No ongoing partition reassignments");
} else {
for (Map.Entry<TopicPartition, PartitionReassignment> entry : reassignments.entrySet()) {
TopicPartition partition = entry.getKey();
PartitionReassignment reassignment = entry.getValue();
System.out.println("Partition: " + partition);
System.out.println(" Current replicas: " + reassignment.replicas());
System.out.println(" Adding replicas: " + reassignment.addingReplicas());
System.out.println(" Removing replicas: " + reassignment.removingReplicas());
}
}
}Manage and inspect log directories on brokers. Use these operations to query disk usage and move replicas between log directories on the same broker.
// Admin interface method
DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers);
DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options);DescribeLogDirsResult:
package org.apache.kafka.clients.admin;
public class DescribeLogDirsResult {
Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions();
KafkaFuture<Map<Integer, Map<String, LogDirDescription>>> allDescriptions();
}LogDirDescription:
package org.apache.kafka.clients.admin;
public class LogDirDescription {
ApiException error();
Map<TopicPartition, ReplicaInfo> replicaInfos();
OptionalLong totalBytes();
OptionalLong usableBytes();
}ReplicaInfo:
package org.apache.kafka.clients.admin;
public class ReplicaInfo {
long size();
long offsetLag();
boolean isFuture();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
// Query log directories for brokers 1 and 2
DescribeLogDirsResult result = admin.describeLogDirs(Arrays.asList(1, 2));
Map<Integer, Map<String, LogDirDescription>> allLogDirs = result.allDescriptions().get();
for (Map.Entry<Integer, Map<String, LogDirDescription>> brokerEntry : allLogDirs.entrySet()) {
int brokerId = brokerEntry.getKey();
System.out.println("Broker " + brokerId + ":");
for (Map.Entry<String, LogDirDescription> dirEntry : brokerEntry.getValue().entrySet()) {
String path = dirEntry.getKey();
LogDirDescription desc = dirEntry.getValue();
System.out.println(" Log directory: " + path);
if (desc.error() != null) {
System.out.println(" Error: " + desc.error().getMessage());
} else {
System.out.println(" Total bytes: " + desc.totalBytes().orElse(-1));
System.out.println(" Usable bytes: " + desc.usableBytes().orElse(-1));
System.out.println(" Partitions: " + desc.replicaInfos().size());
for (Map.Entry<TopicPartition, ReplicaInfo> replicaEntry :
desc.replicaInfos().entrySet()) {
TopicPartition tp = replicaEntry.getKey();
ReplicaInfo info = replicaEntry.getValue();
System.out.println(" " + tp + ": size=" + info.size() +
", lag=" + info.offsetLag() + ", future=" + info.isFuture());
}
}
}
}
}Trigger leader elections for topic partitions. Use this operation to rebalance leadership or recover from unclean leader elections.
// Admin interface method
ElectLeadersResult electLeaders(ElectionType electionType, Set<TopicPartition> partitions);
ElectLeadersResult electLeaders(ElectionType electionType, Set<TopicPartition> partitions,
ElectLeadersOptions options);ElectionType:
package org.apache.kafka.common;
public enum ElectionType {
PREFERRED, // Elect the preferred leader (first replica in replica list)
UNCLEAN // Allow election of out-of-sync replica if no ISR available
}ElectLeadersResult:
package org.apache.kafka.clients.admin;
public class ElectLeadersResult {
KafkaFuture<Map<TopicPartition, Optional<Throwable>>> partitions();
KafkaFuture<Void> all();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
try (Admin admin = Admin.create(props)) {
// Trigger preferred leader election for specific partitions
Set<TopicPartition> partitions = new HashSet<>();
partitions.add(new TopicPartition("my-topic", 0));
partitions.add(new TopicPartition("my-topic", 1));
ElectLeadersResult result = admin.electLeaders(
ElectionType.PREFERRED,
partitions
);
Map<TopicPartition, Optional<Throwable>> results = result.partitions().get();
for (Map.Entry<TopicPartition, Optional<Throwable>> entry : results.entrySet()) {
if (entry.getValue().isPresent()) {
System.err.println("Failed to elect leader for " + entry.getKey() +
": " + entry.getValue().get().getMessage());
} else {
System.out.println("Successfully elected leader for " + entry.getKey());
}
}
// Trigger preferred leader election for all partitions (pass null)
ElectLeadersResult allResult = admin.electLeaders(ElectionType.PREFERRED, null);
allResult.all().get();
System.out.println("Triggered preferred leader election for all partitions");
// Unclean leader election (use with caution - may result in data loss)
ElectLeadersResult uncleanResult = admin.electLeaders(
ElectionType.UNCLEAN,
Collections.singleton(new TopicPartition("critical-topic", 0))
);
uncleanResult.all().get();
}Delete records from topic partitions. This operation sets the low watermark for a partition, making older records unavailable to consumers.
// Admin interface method
DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete);
DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete,
DeleteRecordsOptions options);RecordsToDelete:
package org.apache.kafka.clients.admin;
public class RecordsToDelete {
static RecordsToDelete beforeOffset(long offset);
long beforeOffset();
}DeleteRecordsResult:
package org.apache.kafka.clients.admin;
public class DeleteRecordsResult {
Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks();
KafkaFuture<Void> all();
}DeletedRecords:
package org.apache.kafka.clients.admin;
public class DeletedRecords {
long lowWatermark();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
try (Admin admin = Admin.create(props)) {
// Delete all records before offset 1000 on partition 0
TopicPartition partition = new TopicPartition("my-topic", 0);
Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
recordsToDelete.put(partition, RecordsToDelete.beforeOffset(1000L));
DeleteRecordsResult result = admin.deleteRecords(recordsToDelete);
// Check the new low watermark for each partition
Map<TopicPartition, KafkaFuture<DeletedRecords>> results = result.lowWatermarks();
for (Map.Entry<TopicPartition, KafkaFuture<DeletedRecords>> entry : results.entrySet()) {
DeletedRecords deleted = entry.getValue().get();
System.out.println("New low watermark for " + entry.getKey() +
": " + deleted.lowWatermark());
}
// Or wait for all to complete
result.all().get();
System.out.println("Records deleted successfully");
}// Admin interface method
DescribeClusterResult describeCluster();
DescribeClusterResult describeCluster(DescribeClusterOptions options);DescribeClusterResult:
package org.apache.kafka.clients.admin;
public class DescribeClusterResult {
KafkaFuture<Collection<Node>> nodes();
KafkaFuture<Node> controller();
KafkaFuture<String> clusterId();
KafkaFuture<Set<AclOperation>> authorizedOperations();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.Node;
import java.util.*;
try (Admin admin = Admin.create(props)) {
DescribeClusterResult result = admin.describeCluster();
String clusterId = result.clusterId().get();
Node controller = result.controller().get();
Collection<Node> nodes = result.nodes().get();
System.out.println("Cluster ID: " + clusterId);
System.out.println("Controller: " + controller.id() + " @ " +
controller.host() + ":" + controller.port());
System.out.println("Nodes: " + nodes.size());
for (Node node : nodes) {
System.out.println(" Node " + node.id() + ": " +
node.host() + ":" + node.port());
}
}// Admin interface method
ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets);
ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets,
ListOffsetsOptions options);OffsetSpec:
package org.apache.kafka.clients.admin;
public class OffsetSpec {
public static OffsetSpec latest();
public static OffsetSpec earliest();
public static OffsetSpec forTimestamp(long timestamp);
public static OffsetSpec maxTimestamp();
public static OffsetSpec earliestLocal();
public static OffsetSpec latestTiered();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
try (Admin admin = Admin.create(props)) {
TopicPartition partition = new TopicPartition("my-topic", 0);
Map<TopicPartition, OffsetSpec> offsetSpecs = new HashMap<>();
offsetSpecs.put(partition, OffsetSpec.latest());
ListOffsetsResult result = admin.listOffsets(offsetSpecs);
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets =
result.all().get();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> entry :
offsets.entrySet()) {
System.out.println("Partition: " + entry.getKey() +
", Offset: " + entry.getValue().offset() +
", Timestamp: " + entry.getValue().timestamp());
}
}// Admin interface method
DescribeAclsResult describeAcls(AclBindingFilter filter);
DescribeAclsResult describeAcls(AclBindingFilter filter, DescribeAclsOptions options);// Admin interface method
CreateAclsResult createAcls(Collection<AclBinding> acls);
CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options);// Admin interface method
DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters);
DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters,
DeleteAclsOptions options);// Admin interface method
DescribeTransactionsResult describeTransactions(Collection<String> transactionalIds);
DescribeTransactionsResult describeTransactions(Collection<String> transactionalIds,
DescribeTransactionsOptions options);TransactionDescription:
package org.apache.kafka.clients.admin;
public class TransactionDescription {
public String transactionalId();
public long producerId();
public int producerEpoch();
public TransactionState state();
public Node coordinator();
public Set<TopicPartition> topicPartitions();
}TransactionState:
package org.apache.kafka.clients.admin;
public enum TransactionState {
ONGOING,
PREPARE_ABORT,
PREPARE_COMMIT,
COMPLETE_ABORT,
COMPLETE_COMMIT,
EMPTY,
PREPARE_EPOCH_FENCE,
UNKNOWN
}// Admin interface method
ListTransactionsResult listTransactions();
ListTransactionsResult listTransactions(ListTransactionsOptions options);// Admin interface method
AbortTransactionResult abortTransaction(AbortTransactionSpec spec);
AbortTransactionResult abortTransaction(AbortTransactionSpec spec,
AbortTransactionOptions options);AbortTransactionSpec:
package org.apache.kafka.clients.admin;
public class AbortTransactionSpec {
public AbortTransactionSpec(TopicPartition topicPartition,
long producerId,
short producerEpoch,
int coordinatorEpoch);
}// Admin interface method
TerminateTransactionResult terminateTransaction(TerminateTransactionSpec spec);
TerminateTransactionResult terminateTransaction(TerminateTransactionSpec spec,
TerminateTransactionOptions options);TerminateTransactionSpec:
package org.apache.kafka.clients.admin;
public class TerminateTransactionSpec {
public TerminateTransactionSpec(String transactionalId,
long producerId,
short producerEpoch,
int coordinatorEpoch);
}Usage Example:
import org.apache.kafka.clients.admin.*;
try (Admin admin = Admin.create(props)) {
// Terminate a hanging transaction
TerminateTransactionSpec spec = new TerminateTransactionSpec(
"my-transactional-id",
12345L, // Producer ID
(short) 0, // Producer epoch
0 // Coordinator epoch
);
TerminateTransactionResult result = admin.terminateTransaction(spec);
result.all().get();
System.out.println("Transaction terminated");
}Manage share consumer groups, a new feature introduced in Kafka 4.0 that allows multiple consumers to share partition consumption with automatic record acknowledgment and retry handling.
// Admin interface method
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds);
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds,
DescribeShareGroupsOptions options);Usage Example:
import org.apache.kafka.clients.admin.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
DescribeShareGroupsResult result = admin.describeShareGroups(
Arrays.asList("share-group-1"));
Map<String, ShareGroupDescription> descriptions = result.all().get();
for (Map.Entry<String, ShareGroupDescription> entry : descriptions.entrySet()) {
ShareGroupDescription desc = entry.getValue();
System.out.println("Share Group: " + desc.groupId());
System.out.println(" State: " + desc.state());
System.out.println(" Members: " + desc.members().size());
}
}// Admin interface method
ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs);
ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs,
ListShareGroupOffsetsOptions options);Usage Example:
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
try (Admin admin = Admin.create(props)) {
Map<String, ListShareGroupOffsetsSpec> specs = new HashMap<>();
specs.put("share-group-1", new ListShareGroupOffsetsSpec());
ListShareGroupOffsetsResult result = admin.listShareGroupOffsets(specs);
Map<String, Map<TopicPartition, Long>> offsets = result.all().get();
for (Map.Entry<String, Map<TopicPartition, Long>> groupEntry : offsets.entrySet()) {
System.out.println("Group: " + groupEntry.getKey());
for (Map.Entry<TopicPartition, Long> offsetEntry : groupEntry.getValue().entrySet()) {
System.out.println(" " + offsetEntry.getKey() + ": " + offsetEntry.getValue());
}
}
}// Admin interface method
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId,
Map<TopicPartition, Long> offsets);
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId,
Map<TopicPartition, Long> offsets,
AlterShareGroupOffsetsOptions options);Usage Example:
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
try (Admin admin = Admin.create(props)) {
Map<TopicPartition, Long> newOffsets = new HashMap<>();
newOffsets.put(new TopicPartition("my-topic", 0), 100L);
newOffsets.put(new TopicPartition("my-topic", 1), 200L);
AlterShareGroupOffsetsResult result =
admin.alterShareGroupOffsets("share-group-1", newOffsets);
result.all().get();
System.out.println("Share group offsets updated");
}// Admin interface method
DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds);
DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds,
DeleteShareGroupsOptions options);// Admin interface method
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId,
Set<TopicPartition> partitions);
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId,
Set<TopicPartition> partitions,
DeleteShareGroupOffsetsOptions options);Usage Example:
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
try (Admin admin = Admin.create(props)) {
// Delete offsets for specific partitions
Set<TopicPartition> partitions = new HashSet<>();
partitions.add(new TopicPartition("my-topic", 0));
DeleteShareGroupOffsetsResult result =
admin.deleteShareGroupOffsets("share-group-1", partitions);
result.all().get();
System.out.println("Share group offsets deleted");
// Delete entire share group
DeleteShareGroupsResult deleteResult =
admin.deleteShareGroups(Collections.singletonList("share-group-1"));
deleteResult.all().get();
System.out.println("Share group deleted");
}Manage Kafka Streams application groups. These operations are similar to consumer group operations but specific to Kafka Streams applications.
// Admin interface method
DescribeStreamsGroupsResult describeStreamsGroups(Collection<String> groupIds);
DescribeStreamsGroupsResult describeStreamsGroups(Collection<String> groupIds,
DescribeStreamsGroupsOptions options);Usage Example:
import org.apache.kafka.clients.admin.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
DescribeStreamsGroupsResult result = admin.describeStreamsGroups(
Arrays.asList("streams-app-1"));
Map<String, StreamsGroupDescription> descriptions = result.all().get();
for (Map.Entry<String, StreamsGroupDescription> entry : descriptions.entrySet()) {
StreamsGroupDescription desc = entry.getValue();
System.out.println("Streams Group: " + desc.groupId());
System.out.println(" State: " + desc.state());
System.out.println(" Members: " + desc.members().size());
}
}// Admin interface method
ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs);
ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs,
ListStreamsGroupOffsetsOptions options);Usage Example:
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
try (Admin admin = Admin.create(props)) {
Map<String, ListStreamsGroupOffsetsSpec> specs = new HashMap<>();
specs.put("streams-app-1", new ListStreamsGroupOffsetsSpec());
ListStreamsGroupOffsetsResult result = admin.listStreamsGroupOffsets(specs);
Map<String, Map<TopicPartition, OffsetAndMetadata>> offsets = result.all().get();
for (Map.Entry<String, Map<TopicPartition, OffsetAndMetadata>> groupEntry :
offsets.entrySet()) {
System.out.println("Group: " + groupEntry.getKey());
for (Map.Entry<TopicPartition, OffsetAndMetadata> offsetEntry :
groupEntry.getValue().entrySet()) {
System.out.println(" " + offsetEntry.getKey() +
": offset=" + offsetEntry.getValue().offset() +
", metadata=" + offsetEntry.getValue().metadata());
}
}
}// Admin interface method
AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId,
Map<TopicPartition, OffsetAndMetadata> offsets);
AlterStreamsGroupOffsetsResult alterStreamsGroupOffsets(String groupId,
Map<TopicPartition, OffsetAndMetadata> offsets,
AlterStreamsGroupOffsetsOptions options);Usage Example:
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
try (Admin admin = Admin.create(props)) {
Map<TopicPartition, OffsetAndMetadata> newOffsets = new HashMap<>();
newOffsets.put(new TopicPartition("input-topic", 0), new OffsetAndMetadata(100L));
newOffsets.put(new TopicPartition("input-topic", 1), new OffsetAndMetadata(200L));
AlterStreamsGroupOffsetsResult result =
admin.alterStreamsGroupOffsets("streams-app-1", newOffsets);
result.all().get();
System.out.println("Streams group offsets updated");
}// Admin interface method
DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds);
DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds,
DeleteStreamsGroupsOptions options);// Admin interface method
DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId,
Set<TopicPartition> partitions);
DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId,
Set<TopicPartition> partitions,
DeleteStreamsGroupOffsetsOptions options);Usage Example:
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
try (Admin admin = Admin.create(props)) {
// Delete offsets for specific partitions
Set<TopicPartition> partitions = new HashSet<>();
partitions.add(new TopicPartition("input-topic", 0));
DeleteStreamsGroupOffsetsResult result =
admin.deleteStreamsGroupOffsets("streams-app-1", partitions);
result.all().get();
System.out.println("Streams group offsets deleted");
// Delete entire streams group
DeleteStreamsGroupsResult deleteResult =
admin.deleteStreamsGroups(Collections.singletonList("streams-app-1"));
deleteResult.all().get();
System.out.println("Streams group deleted");
}// Admin interface method
DescribeProducersResult describeProducers(Collection<TopicPartition> partitions);
DescribeProducersResult describeProducers(Collection<TopicPartition> partitions,
DescribeProducersOptions options);// Admin interface method
FenceProducersResult fenceProducers(Collection<String> transactionalIds);
FenceProducersResult fenceProducers(Collection<String> transactionalIds,
FenceProducersOptions options);All options classes extend AbstractOptions<T> with timeout configuration:
package org.apache.kafka.clients.admin;
public abstract class AbstractOptions<T extends AbstractOptions<T>> {
public T timeoutMs(Integer timeoutMs);
public Integer timeoutMs();
}Common Options Classes:
CreateTopicsOptionsDeleteTopicsOptionsListTopicsOptionsDescribeTopicsOptionsCreatePartitionsOptionsDescribeConfigsOptionsAlterConfigsOptionsDescribeConsumerGroupsOptionsListConsumerGroupsOptionsListConsumerGroupOffsetsOptionsAlterConsumerGroupOffsetsOptionsDeleteConsumerGroupsOptionsRemoveMembersFromConsumerGroupOptionsDescribeClusterOptionsListOffsetsOptionsUsage Example:
import org.apache.kafka.clients.admin.*;
import java.time.Duration;
CreateTopicsOptions options = new CreateTopicsOptions()
.timeoutMs(30000)
.validateOnly(true); // Validate without creating
ListTopicsOptions listOptions = new ListTopicsOptions()
.timeoutMs(10000)
.listInternal(false);All result classes provide KafkaFuture<T> methods for asynchronous operations:
package org.apache.kafka.clients.admin;
// Common pattern
public class SomeResult {
// Future for all items
KafkaFuture<Void> all();
// Futures for individual items
Map<String, KafkaFuture<ItemType>> values();
}KafkaFuture:
package org.apache.kafka.common;
public interface KafkaFuture<T> {
// Blocking methods
T get() throws InterruptedException, ExecutionException;
T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
T getNow(T valueIfAbsent) throws InterruptedException, ExecutionException;
// Status methods
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
boolean isCompletedExceptionally();
// Composition methods
KafkaFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
<R> KafkaFuture<R> thenApply(BaseFunction<T, R> function);
// Static factory methods
static <U> KafkaFuture<U> completedFuture(U value);
static KafkaFuture<Void> allOf(KafkaFuture<?>... futures);
// Conversion
CompletionStage<T> toCompletionStage();
}Static Factory Methods:
Create pre-completed futures or combine multiple futures:
// Create a completed future
KafkaFuture<String> completed = KafkaFuture.completedFuture("result");
// Wait for all futures to complete
KafkaFuture<Void> all = KafkaFuture.allOf(future1, future2, future3);Future Composition Example:
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
try (Admin admin = Admin.create(props)) {
// Chain operations with thenApply
KafkaFuture<Integer> topicCountFuture = admin
.listTopics()
.names()
.thenApply(names -> names.size());
System.out.println("Total topics: " + topicCountFuture.get());
// Use whenComplete for callbacks (doesn't transform the result)
admin.createTopics(Collections.singletonList(new NewTopic("my-topic", 3, (short) 2)))
.all()
.whenComplete((v, throwable) -> {
if (throwable != null) {
System.err.println("Failed to create topic: " + throwable.getMessage());
} else {
System.out.println("Topic created successfully");
}
});
// Convert to CompletionStage for advanced composition with CompletableFuture
CompletionStage<Set<String>> stage = admin.listTopics().names().toCompletionStage();
stage.thenAccept(names -> System.out.println("Topics: " + names));
}Usage Example:
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import java.util.*;
try (Admin admin = Admin.create(props)) {
CreateTopicsResult result = admin.createTopics(
Collections.singletonList(new NewTopic("my-topic", 3, (short) 2)));
// Blocking wait
result.all().get();
// Non-blocking callback
result.all().whenComplete((v, throwable) -> {
if (throwable != null) {
System.err.println("Failed to create topic: " + throwable.getMessage());
} else {
System.out.println("Topic created successfully");
}
});
}Manage and inspect feature flags in the Kafka cluster. Use these operations to upgrade or downgrade cluster features.
// Admin interface method
DescribeFeaturesResult describeFeatures();
DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);DescribeFeaturesResult:
package org.apache.kafka.clients.admin;
public class DescribeFeaturesResult {
KafkaFuture<FeatureMetadata> featureMetadata();
}FeatureMetadata:
package org.apache.kafka.clients.admin;
public class FeatureMetadata {
Map<String, FinalizedVersionRange> finalizedFeatures();
Optional<Long> finalizedFeaturesEpoch();
Map<String, SupportedVersionRange> supportedFeatures();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
DescribeFeaturesResult result = admin.describeFeatures();
FeatureMetadata metadata = result.featureMetadata().get();
System.out.println("Finalized Features:");
for (Map.Entry<String, FinalizedVersionRange> entry :
metadata.finalizedFeatures().entrySet()) {
FinalizedVersionRange range = entry.getValue();
System.out.println(" " + entry.getKey() +
": min=" + range.minVersionLevel() +
", max=" + range.maxVersionLevel());
}
System.out.println("\nSupported Features:");
for (Map.Entry<String, SupportedVersionRange> entry :
metadata.supportedFeatures().entrySet()) {
SupportedVersionRange range = entry.getValue();
System.out.println(" " + entry.getKey() +
": min=" + range.minVersion() +
", max=" + range.maxVersion());
}
}// Admin interface method
UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates);
UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates,
UpdateFeaturesOptions options);FeatureUpdate:
package org.apache.kafka.clients.admin;
public class FeatureUpdate {
public FeatureUpdate(short maxVersionLevel, UpgradeType upgradeType);
short maxVersionLevel();
UpgradeType upgradeType();
enum UpgradeType {
UNKNOWN,
UPGRADE, // Upgrade the feature level
SAFE_DOWNGRADE, // Downgrade without metadata loss
UNSAFE_DOWNGRADE // Downgrade allowing metadata loss
}
}UpdateFeaturesResult:
package org.apache.kafka.clients.admin;
public class UpdateFeaturesResult {
Map<String, KafkaFuture<Void>> values();
KafkaFuture<Void> all();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
// Upgrade a feature to version 2
Map<String, FeatureUpdate> updates = new HashMap<>();
updates.put("group.version", new FeatureUpdate(
(short) 2,
FeatureUpdate.UpgradeType.UPGRADE
));
UpdateFeaturesResult result = admin.updateFeatures(updates);
result.all().get();
System.out.println("Feature updated successfully");
// Safe downgrade a feature
Map<String, FeatureUpdate> downgrades = new HashMap<>();
downgrades.put("metadata.version", new FeatureUpdate(
(short) 1,
FeatureUpdate.UpgradeType.SAFE_DOWNGRADE
));
UpdateFeaturesResult downgradeResult = admin.updateFeatures(downgrades);
downgradeResult.all().get();
System.out.println("Feature downgraded successfully");
}Query information about the KRaft metadata quorum. This operation is only available when using KRaft mode (not ZooKeeper).
// Admin interface method
DescribeMetadataQuorumResult describeMetadataQuorum();
DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options);DescribeMetadataQuorumResult:
package org.apache.kafka.clients.admin;
public class DescribeMetadataQuorumResult {
KafkaFuture<QuorumInfo> quorumInfo();
}Usage Example:
import org.apache.kafka.clients.admin.*;
try (Admin admin = Admin.create(props)) {
DescribeMetadataQuorumResult result = admin.describeMetadataQuorum();
QuorumInfo info = result.quorumInfo().get();
System.out.println("Leader ID: " + info.leaderId());
System.out.println("Leader Epoch: " + info.leaderEpoch());
System.out.println("High Watermark: " + info.highWatermark());
System.out.println("Voters:");
for (ReplicaState voter : info.voters()) {
System.out.println(" ID: " + voter.replicaId() +
", Offset: " + voter.logEndOffset());
}
System.out.println("Observers:");
for (ReplicaState observer : info.observers()) {
System.out.println(" ID: " + observer.replicaId() +
", Offset: " + observer.logEndOffset());
}
}List all consumer, share, and streams groups in the cluster. This operation replaces the deprecated listConsumerGroups() method and provides a unified view of all group types.
// Admin interface method
ListGroupsResult listGroups();
ListGroupsResult listGroups(ListGroupsOptions options);ListGroupsResult:
package org.apache.kafka.clients.admin;
public class ListGroupsResult {
KafkaFuture<Collection<GroupListing>> all();
KafkaFuture<Collection<GroupListing>> valid();
KafkaFuture<Collection<Throwable>> errors();
}GroupListing:
package org.apache.kafka.clients.admin;
public class GroupListing {
String groupId();
Optional<GroupType> type();
Optional<String> protocol();
Optional<GroupState> state();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
ListGroupsResult result = admin.listGroups();
Collection<GroupListing> groups = result.all().get();
for (GroupListing group : groups) {
System.out.println("Group: " + group.groupId());
System.out.println(" Type: " + group.type().orElse(null));
System.out.println(" State: " + group.state().orElse(null));
System.out.println(" Protocol: " + group.protocol().orElse("unknown"));
}
// Only get valid groups (filter out errors)
Collection<GroupListing> validGroups = result.valid().get();
System.out.println("\nTotal valid groups: " + validGroups.size());
}Describe legacy consumer groups (groups using the classic group protocol, not the new consumer group protocol).
// Admin interface method
DescribeClassicGroupsResult describeClassicGroups(Collection<String> groupIds);
DescribeClassicGroupsResult describeClassicGroups(Collection<String> groupIds,
DescribeClassicGroupsOptions options);Usage Example:
import org.apache.kafka.clients.admin.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
DescribeClassicGroupsResult result = admin.describeClassicGroups(
Arrays.asList("legacy-group-1"));
Map<String, ClassicGroupDescription> descriptions = result.all().get();
for (Map.Entry<String, ClassicGroupDescription> entry : descriptions.entrySet()) {
ClassicGroupDescription desc = entry.getValue();
System.out.println("Classic Group: " + desc.groupId());
System.out.println(" Protocol Type: " + desc.protocolType());
System.out.println(" State: " + desc.state());
System.out.println(" Members: " + desc.members().size());
for (MemberDescription member : desc.members()) {
System.out.println(" Member: " + member.memberId());
System.out.println(" Client ID: " + member.clientId());
System.out.println(" Host: " + member.host());
}
}
}Remove a broker from the cluster. This operation is useful for gracefully decommissioning brokers in KRaft mode.
// Admin interface method
UnregisterBrokerResult unregisterBroker(int brokerId);
UnregisterBrokerResult unregisterBroker(int brokerId, UnregisterBrokerOptions options);Usage Example:
import org.apache.kafka.clients.admin.*;
try (Admin admin = Admin.create(props)) {
UnregisterBrokerResult result = admin.unregisterBroker(3);
result.all().get();
System.out.println("Broker 3 unregistered successfully");
}Add a new voter to the KRaft metadata quorum. This operation is only available in KRaft mode.
// Admin interface method
AddRaftVoterResult addRaftVoter(int voterId, Uuid voterDirectoryId, Set<RaftVoterEndpoint> endpoints);
AddRaftVoterResult addRaftVoter(int voterId, Uuid voterDirectoryId, Set<RaftVoterEndpoint> endpoints,
AddRaftVoterOptions options);RaftVoterEndpoint:
package org.apache.kafka.clients.admin;
public class RaftVoterEndpoint {
public RaftVoterEndpoint(String name, String host, int port);
String name();
String host();
int port();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.Uuid;
import java.util.*;
try (Admin admin = Admin.create(props)) {
Set<RaftVoterEndpoint> endpoints = new HashSet<>();
endpoints.add(new RaftVoterEndpoint("controller", "broker4.example.com", 9093));
AddRaftVoterResult result = admin.addRaftVoter(
4, // Voter ID
Uuid.randomUuid(), // Voter directory ID
endpoints
);
result.all().get();
System.out.println("Raft voter added successfully");
}Remove a voter from the KRaft metadata quorum. This operation is only available in KRaft mode.
// Admin interface method
RemoveRaftVoterResult removeRaftVoter(int voterId);
RemoveRaftVoterResult removeRaftVoter(int voterId, RemoveRaftVoterOptions options);Usage Example:
import org.apache.kafka.clients.admin.*;
try (Admin admin = Admin.create(props)) {
RemoveRaftVoterResult result = admin.removeRaftVoter(4);
result.all().get();
System.out.println("Raft voter removed successfully");
}List all client metrics configurations in the cluster. This operation is useful for managing client-side metrics collection.
// Admin interface method
ListClientMetricsResourcesResult listClientMetricsResources();
ListClientMetricsResourcesResult listClientMetricsResources(ListClientMetricsResourcesOptions options);ListClientMetricsResourcesResult:
package org.apache.kafka.clients.admin;
public class ListClientMetricsResourcesResult {
KafkaFuture<Collection<ClientMetricsResourceListing>> all();
}Usage Example:
import org.apache.kafka.clients.admin.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
ListClientMetricsResourcesResult result = admin.listClientMetricsResources();
Collection<ClientMetricsResourceListing> resources = result.all().get();
for (ClientMetricsResourceListing resource : resources) {
System.out.println("Client Metrics Resource: " + resource.name());
}
}List available configuration resource types in the cluster without retrieving their actual configuration values.
// Admin interface method
ListConfigResourcesResult listConfigResources();
ListConfigResourcesResult listConfigResources(Set<ConfigResource.Type> types);
ListConfigResourcesResult listConfigResources(Set<ConfigResource.Type> types,
ListConfigResourcesOptions options);Usage Example:
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.ConfigResource;
import java.util.*;
try (Admin admin = Admin.create(props)) {
// List all topic config resources
Set<ConfigResource.Type> types = Collections.singleton(ConfigResource.Type.TOPIC);
ListConfigResourcesResult result = admin.listConfigResources(types);
Collection<ConfigResourceListing> listings = result.all().get();
for (ConfigResourceListing listing : listings) {
System.out.println("Config Resource: " + listing.name() +
" (Type: " + listing.type() + ")");
}
// List all config resource types
ListConfigResourcesResult allResult = admin.listConfigResources();
Collection<ConfigResourceListing> allListings = allResult.all().get();
System.out.println("\nTotal config resources: " + allListings.size());
}Always close the admin client:
try (Admin admin = Admin.create(props)) {
// Use admin client
}
// Automatically closed
// Or manual close
Admin admin = Admin.create(props);
try {
// Use admin client
} finally {
admin.close();
}import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.errors.*;
import java.util.concurrent.ExecutionException;
try (Admin admin = Admin.create(props)) {
CreateTopicsResult result = admin.createTopics(
Collections.singletonList(new NewTopic("my-topic", 3, (short) 2)));
result.all().get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof TopicExistsException) {
System.out.println("Topic already exists");
} else if (cause instanceof InvalidReplicationFactorException) {
System.err.println("Invalid replication factor");
} else if (cause instanceof InvalidPartitionsException) {
System.err.println("Invalid partition count");
} else {
System.err.println("Failed to create topic: " + cause.getMessage());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}Process multiple resources efficiently:
import org.apache.kafka.clients.admin.*;
import java.util.*;
try (Admin admin = Admin.create(props)) {
// Create multiple topics in one call
List<NewTopic> topics = Arrays.asList(
new NewTopic("topic1", 3, (short) 2),
new NewTopic("topic2", 5, (short) 2),
new NewTopic("topic3", 1, (short) 2)
);
CreateTopicsResult result = admin.createTopics(topics);
// Wait for all or handle individually
for (Map.Entry<String, KafkaFuture<Void>> entry : result.values().entrySet()) {
try {
entry.getValue().get();
System.out.println("Created topic: " + entry.getKey());
} catch (Exception e) {
System.err.println("Failed to create topic " + entry.getKey() +
": " + e.getMessage());
}
}
}import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import java.util.Map;
try (Admin admin = Admin.create(props)) {
Map<MetricName, ? extends Metric> metrics = admin.metrics();
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
MetricName name = entry.getKey();
System.out.println(name.group() + "." + name.name() + ": " +
entry.getValue().metricValue());
}
}Symptoms:
Causes:
Solutions:
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.errors.TopicExistsException;
import java.util.*;
import java.util.concurrent.ExecutionException;
public void createTopicIdempotent(Admin admin, String topicName,
int partitions, short replicationFactor) {
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
try {
admin.createTopics(Collections.singletonList(newTopic)).all().get();
System.out.println("Topic created: " + topicName);
} catch (ExecutionException e) {
if (e.getCause() instanceof TopicExistsException) {
System.out.println("Topic already exists: " + topicName);
// Optionally verify configuration matches
DescribeTopicsResult describeResult =
admin.describeTopics(Collections.singletonList(topicName));
TopicDescription description =
describeResult.allTopicNames().get().get(topicName);
if (description.partitions().size() != partitions) {
System.err.println("WARNING: Topic exists but has " +
description.partitions().size() + " partitions, expected " + partitions);
}
} else {
throw new RuntimeException("Failed to create topic", e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted", e);
}
}Prevention:
Symptoms:
Causes:
Solutions:
import org.apache.kafka.clients.admin.*;
import java.util.*;
// Increase default timeout
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000); // 60 seconds
props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 120000); // 2 minutes
try (Admin admin = Admin.create(props)) {
// Or set timeout per operation
CreateTopicsOptions options = new CreateTopicsOptions()
.timeoutMs(120000); // 2 minutes
CreateTopicsResult result = admin.createTopics(topics, options);
result.all().get();
}
// For very long operations, monitor progress
CreateTopicsResult result = admin.createTopics(largeBatchOfTopics);
// Check individual topic results
Map<String, KafkaFuture<Void>> values = result.values();
for (Map.Entry<String, KafkaFuture<Void>> entry : values.entrySet()) {
try {
entry.getValue().get(60, TimeUnit.SECONDS);
System.out.println("Created: " + entry.getKey());
} catch (TimeoutException e) {
System.err.println("Timeout creating: " + entry.getKey());
} catch (ExecutionException e) {
System.err.println("Error creating " + entry.getKey() +
": " + e.getCause().getMessage());
}
}Prevention:
Symptoms:
Causes:
Solutions:
public void waitForTopicCreation(Admin admin, String topicName,
int maxWaitMs) throws Exception {
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < maxWaitMs) {
try {
DescribeTopicsResult result =
admin.describeTopics(Collections.singletonList(topicName));
result.allTopicNames().get();
System.out.println("Topic ready: " + topicName);
return;
} catch (ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
System.out.println("Waiting for topic metadata to propagate...");
Thread.sleep(1000);
} else {
throw e;
}
}
}
throw new TimeoutException("Topic not ready after " + maxWaitMs + "ms");
}
// Usage
admin.createTopics(Collections.singletonList(newTopic)).all().get();
waitForTopicCreation(admin, topicName, 30000); // Wait up to 30sPrevention:
// Multiple admin clients operating simultaneously
// Can cause conflicts
// Safe: Check-then-create pattern with proper exception handling
try (Admin admin = Admin.create(props)) {
// Check if topic exists
ListTopicsResult listResult = admin.listTopics();
Set<String> existingTopics = listResult.names().get();
if (!existingTopics.contains("my-topic")) {
try {
admin.createTopics(
Collections.singletonList(new NewTopic("my-topic", 3, (short) 2))
).all().get();
System.out.println("Topic created");
} catch (ExecutionException e) {
if (e.getCause() instanceof TopicExistsException) {
// Another client created it between check and create
System.out.println("Topic created by another client");
} else {
throw e;
}
}
}
}// When creating multiple topics, some may succeed and others fail
try (Admin admin = Admin.create(props)) {
List<NewTopic> topics = Arrays.asList(
new NewTopic("topic1", 3, (short) 2),
new NewTopic("existing-topic", 3, (short) 2), // Already exists
new NewTopic("topic3", 3, (short) 2)
);
CreateTopicsResult result = admin.createTopics(topics);
// Don't use result.all().get() - stops at first failure
// Instead, check each individually
Map<String, KafkaFuture<Void>> values = result.values();
Set<String> successful = new HashSet<>();
Set<String> failed = new HashSet<>();
for (Map.Entry<String, KafkaFuture<Void>> entry : values.entrySet()) {
try {
entry.getValue().get();
successful.add(entry.getKey());
} catch (ExecutionException e) {
failed.add(entry.getKey());
System.err.println("Failed to create " + entry.getKey() +
": " + e.getCause().getMessage());
}
}
System.out.println("Created: " + successful);
System.out.println("Failed: " + failed);
}// Describing consumer groups during rebalance
try (Admin admin = Admin.create(props)) {
DescribeConsumerGroupsResult result =
admin.describeConsumerGroups(Collections.singletonList("my-group"));
ConsumerGroupDescription description =
result.all().get().get("my-group");
if (description.groupState() == ConsumerGroupState.REBALANCING) {
System.out.println("Group is rebalancing - member info may be incomplete");
// Wait and retry
Thread.sleep(5000);
DescribeConsumerGroupsResult retryResult =
admin.describeConsumerGroups(Collections.singletonList("my-group"));
description = retryResult.all().get().get("my-group");
}
System.out.println("State: " + description.groupState());
System.out.println("Members: " + description.members().size());
}// Cannot delete active consumer group
try (Admin admin = Admin.create(props)) {
try {
admin.deleteConsumerGroups(Collections.singletonList("my-group")).all().get();
System.out.println("Group deleted");
} catch (ExecutionException e) {
if (e.getCause() instanceof GroupNotEmptyException) {
System.out.println("Group has active members, cannot delete");
// Option 1: Remove members first
RemoveMembersFromConsumerGroupOptions removeOptions =
new RemoveMembersFromConsumerGroupOptions();
admin.removeMembersFromConsumerGroup("my-group", removeOptions).all().get();
Thread.sleep(5000); // Wait for members to leave
// Then delete
admin.deleteConsumerGroups(Collections.singletonList("my-group")).all().get();
// Option 2: Wait for consumers to close
System.out.println("Close all consumers in the group first");
} else {
throw e;
}
}
}public class SafeAdminOperations {
public void safeCreateTopic(Admin admin, String topicName,
int partitions, short replicationFactor) throws Exception {
// Check cluster has enough brokers for replication
DescribeClusterResult clusterResult = admin.describeCluster();
int brokerCount = clusterResult.nodes().get().size();
if (replicationFactor > brokerCount) {
throw new IllegalArgumentException(
"Replication factor " + replicationFactor +
" exceeds broker count " + brokerCount);
}
// Check topic doesn't exist
ListTopicsResult listResult = admin.listTopics();
if (listResult.names().get().contains(topicName)) {
System.out.println("Topic already exists: " + topicName);
return;
}
// Create topic
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
admin.createTopics(Collections.singletonList(newTopic)).all().get();
System.out.println("Topic created: " + topicName);
}
public void safeDeleteTopic(Admin admin, String topicName) throws Exception {
// Check topic exists
ListTopicsResult listResult = admin.listTopics();
if (!listResult.names().get().contains(topicName)) {
System.out.println("Topic doesn't exist: " + topicName);
return;
}
// Check for active consumers
List<ConsumerGroupListing> groups =
new ArrayList<>(admin.listConsumerGroups().all().get());
for (ConsumerGroupListing group : groups) {
ListConsumerGroupOffsetsResult offsetResult =
admin.listConsumerGroupOffsets(group.groupId());
Map<TopicPartition, OffsetAndMetadata> offsets =
offsetResult.partitionsToOffsetAndMetadata().get();
boolean consumingFromTopic = offsets.keySet().stream()
.anyMatch(tp -> tp.topic().equals(topicName));
if (consumingFromTopic) {
System.err.println("WARNING: Consumer group " + group.groupId() +
" is actively consuming from topic " + topicName);
}
}
// Delete topic
admin.deleteTopics(Collections.singletonList(topicName)).all().get();
System.out.println("Topic deleted: " + topicName);
}
}import org.apache.kafka.clients.admin.*;
import java.util.*;
import java.util.concurrent.Semaphore;
public class RateLimitedAdmin {
private final Admin admin;
private final Semaphore rateLimiter;
public RateLimitedAdmin(Admin admin, int operationsPerSecond) {
this.admin = admin;
this.rateLimiter = new Semaphore(operationsPerSecond);
}
public void createTopicWithRateLimit(String topicName) throws Exception {
rateLimiter.acquire();
try {
NewTopic newTopic = new NewTopic(topicName, 3, (short) 2);
admin.createTopics(Collections.singletonList(newTopic)).all().get();
System.out.println("Created topic: " + topicName);
} finally {
// Release permit after 1 second
new Timer().schedule(new TimerTask() {
@Override
public void run() {
rateLimiter.release();
}
}, 1000);
}
}
}import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import java.util.*;
import java.util.concurrent.CompletableFuture;
public class AsyncAdminOperations {
public CompletableFuture<Void> createTopicsAsync(Admin admin,
List<NewTopic> topics) {
CreateTopicsResult result = admin.createTopics(topics);
// Convert KafkaFuture to CompletableFuture
CompletableFuture<Void> future = new CompletableFuture<>();
result.all().whenComplete((v, throwable) -> {
if (throwable != null) {
future.completeExceptionally(throwable);
} else {
future.complete(null);
}
});
return future;
}
public void chainOperations(Admin admin) {
// Create topics, then describe them
createTopicsAsync(admin, Arrays.asList(
new NewTopic("topic1", 3, (short) 2),
new NewTopic("topic2", 5, (short) 2)
)).thenCompose(v -> {
// After topics created, describe them
DescribeTopicsResult describeResult =
admin.describeTopics(Arrays.asList("topic1", "topic2"));
CompletableFuture<Map<String, TopicDescription>> describeFuture =
new CompletableFuture<>();
describeResult.allTopicNames().whenComplete((descriptions, throwable) -> {
if (throwable != null) {
describeFuture.completeExceptionally(throwable);
} else {
describeFuture.complete(descriptions);
}
});
return describeFuture;
}).thenAccept(descriptions -> {
// Process descriptions
for (Map.Entry<String, TopicDescription> entry : descriptions.entrySet()) {
System.out.println("Topic " + entry.getKey() +
" has " + entry.getValue().partitions().size() + " partitions");
}
}).exceptionally(throwable -> {
System.err.println("Operation failed: " + throwable.getMessage());
return null;
});
}
}// Consumer group with no active members
try (Admin admin = Admin.create(props)) {
DescribeConsumerGroupsResult result =
admin.describeConsumerGroups(Collections.singletonList("empty-group"));
ConsumerGroupDescription description =
result.all().get().get("empty-group");
if (description.members().isEmpty()) {
System.out.println("Group has no active members");
// Can still have committed offsets
ListConsumerGroupOffsetsResult offsetsResult =
admin.listConsumerGroupOffsets("empty-group");
Map<TopicPartition, OffsetAndMetadata> offsets =
offsetsResult.partitionsToOffsetAndMetadata().get();
if (!offsets.isEmpty()) {
System.out.println("Group has committed offsets: " + offsets.size());
// Can delete group or reset offsets
}
}
}// Monitor partition reassignment progress
public void monitorReassignment(Admin admin, TopicPartition partition) throws Exception {
while (true) {
ListPartitionReassignmentsResult result =
admin.listPartitionReassignments(Collections.singleton(partition));
Map<TopicPartition, PartitionReassignment> reassignments =
result.reassignments().get();
if (reassignments.isEmpty()) {
System.out.println("Reassignment complete for " + partition);
break;
}
PartitionReassignment reassignment = reassignments.get(partition);
System.out.println("Reassignment in progress:");
System.out.println(" Current replicas: " + reassignment.replicas());
System.out.println(" Adding replicas: " + reassignment.addingReplicas());
System.out.println(" Removing replicas: " + reassignment.removingReplicas());
// Check replica sync status
DescribeTopicsResult describeResult =
admin.describeTopics(Collections.singletonList(partition.topic()));
TopicDescription topicDesc =
describeResult.allTopicNames().get().get(partition.topic());
TopicPartitionInfo partitionInfo =
topicDesc.partitions().get(partition.partition());
System.out.println(" ISR: " + partitionInfo.isr().size() + "/" +
partitionInfo.replicas().size());
Thread.sleep(5000); // Check every 5 seconds
}
}// ACLs can be created for non-existent topics
// This is by design - allows pre-provisioning security
try (Admin admin = Admin.create(props)) {
// Create ACL for topic that doesn't exist yet
AclBinding aclBinding = new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "future-topic", PatternType.LITERAL),
new AccessControlEntry("User:alice", "*", AclOperation.READ,
AclPermissionType.ALLOW)
);
admin.createAcls(Collections.singleton(aclBinding)).all().get();
System.out.println("ACL created for non-existent topic");
// Later, when topic is created, ACL is already in effect
admin.createTopics(Collections.singletonList(
new NewTopic("future-topic", 3, (short) 2)
)).all().get();
System.out.println("Topic created with pre-existing ACL");
}// Create many topics efficiently
public void createManyTopics(Admin admin, int count) throws Exception {
// Create in batches to avoid overwhelming broker
int batchSize = 100;
for (int i = 0; i < count; i += batchSize) {
List<NewTopic> batch = new ArrayList<>();
for (int j = i; j < Math.min(i + batchSize, count); j++) {
batch.add(new NewTopic("topic-" + j, 3, (short) 2));
}
CreateTopicsResult result = admin.createTopics(batch);
result.all().get(); // Wait for batch to complete
System.out.println("Created batch " + (i / batchSize + 1) +
": topics " + i + " to " + Math.min(i + batchSize - 1, count - 1));
// Small delay between batches
Thread.sleep(1000);
}
}// Admin client is thread-safe and should be shared
public class AdminClientPool {
private static Admin instance;
public static synchronized Admin getInstance(Properties props) {
if (instance == null) {
instance = Admin.create(props);
// Register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (instance != null) {
instance.close(Duration.ofSeconds(30));
}
}));
}
return instance;
}
}
// Usage across application
Admin admin = AdminClientPool.getInstance(props);
admin.createTopics(topics);
// Reuse same instance
admin.describeTopics(topicNames);