在Kafka 2.8.0之前的版本中,使用ClusterMetadata对象检索集群中所有主题的详细信息时会发生崩溃。 在这种情况下,解决此问题的最佳方法是使用下面的代码示例中的新的KafkaAdminClient类代替ClusterMetadata类。
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
public class KafkaTopicUtil {
private Admin admin;
public KafkaTopicUtil(String kafkaServers) {
admin = AdminClient.create(buildAdminConfig(kafkaServers));
}
public Collection getTopicNames() {
try {
ListTopicsResult result = admin.listTopics();
KafkaFuture> namesFuture = result.names();
return namesFuture.get(30, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException("Failed to get topic list", e);
}
}
public List getTopicDescriptions(String topic) {
try {
KafkaFuture topicDescriptionFuture = admin.describeTopics(Lists.newArrayList(topic)).values().get(topic);
TopicDescription description = topicDescriptionFuture.get(30, TimeUnit.SECONDS);
return Lists.newArrayList(description);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException("Failed to get topic descriptions", e);
}
}
public List getBrokers() {
try {
ClusterDescription description = admin.describeCluster().clusterDescription();
return description.nodes();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Failed to get cluster description", e);
}
}
public TopicPartitionInfo getPartition(String topic, int partition) {
TopicDescription description = getTopicDescriptions(topic).get(0