Kafka的主题由一个或多个分区组成,这些分区可以被分配到不同的Broker上。每个分区都有一个领导者和一个或多个副本。领导者负责处理该分区的所有读写请求。如果领导者宕机,则会从副本中选出新的领导者。因此,分区的领导者分布对Kafka集群的性能和可靠性都非常重要。
以下示例演示了如何检查分区领导者在各个Broker上的分布情况:
import java.util.Map;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicDescription.PartitionReplica;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
public class PartitionLeaderDistribution {
public static void main(String[] args) throws Exception {
// Set up an AdminClient instance
// Replace BOOTSTRAP_SERVERS with your Kafka broker addresses
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
"BOOTSTRAP_SERVERS");
AdminClient admin = AdminClient.create(props);
// Get a list of all topics in the cluster
ListTopicsResult topics = admin.listTopics();
Set topicNames = topics.names().get();
// Iterate through each topic and print its partition leader distribution
for (String topicName : topicNames) {
TopicDescription description = admin.describeTopics(Arrays.asList(topicName)).values().get(topicName).get();
List replicas = description.partitions().stream()
.flatMap(p -> p.replicas().stream())
.collect(Collectors.toList());
Map> leaderMap = replicas.stream()
.filter(PartitionReplica::isLeader)
.collect(Collectors.groupingBy(PartitionReplica::broker));
System.out.println("Topic: " + topicName);
for (Node node : leaderMap.keySet()) {
List topicPartitions = leaderMap.get(node);
System.out.println("Broker " + node.id() + ": " + topicPartitions.size() + " partitions");
}
System.out.println();
}
// Close the AdminClient instance
admin.close();
}
}
这段代码会输出每个Broker上的分区领导者数目,以及他们领导的主题。如果你发现
上一篇:ApacheKafka的消息被归档了-是否可能检索到这些消息?
下一篇:ApacheKafka服务器启动绑定到路径/brokers/ids/0,并使用地址deploy.static.akamaitechnologies.com:9092而不是localhost。