以下是一个使用Apache Kafka客户端(Java)列出主题并检查主题是否进行日志压缩的解决方法的示例代码:
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartitionInfo;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaTopicCompressionCheck {
public static void main(String[] args) {
// 设置Kafka集群的地址
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 创建AdminClient
try (AdminClient adminClient = AdminClient.create(properties)) {
// 列出所有主题
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true); // 包括内部主题(如__consumer_offsets)
ListTopicsResult topicsResult = adminClient.listTopics(options);
Map topics = topicsResult.namesToListings().get();
// 遍历主题并检查是否进行了日志压缩
for (Map.Entry entry : topics.entrySet()) {
String topicName = entry.getKey();
TopicDescription topicDescription = entry.getValue();
System.out.println("Topic: " + topicName);
System.out.println("Is compressed: " + isTopicCompressed(topicDescription));
System.out.println("Partitions: " + topicDescription.partitions().size());
for (TopicPartitionInfo partitionInfo : topicDescription.partitions()) {
System.out.println("Partition: " + partitionInfo.partition());
System.out.println("Is compressed: " + partitionInfo.isCompressed());
}
System.out.println();
}
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
// 检查主题是否进行了日志压缩
private static boolean isTopicCompressed(TopicDescription topicDescription) {
for (TopicPartitionInfo partitionInfo : topicDescription.partitions()) {
if (partitionInfo.isCompressed()) {
return true;
}
}
return false;
}
}
请注意,此示例代码假设Kafka集群位于本地主机上的默认端口9092。您需要根据您的实际情况修改BOOTSTRAP_SERVERS_CONFIG
属性的值。
此代码使用Kafka的AdminClient来连接到Kafka集群并列出主题。然后,它遍历每个主题的分区并检查每个分区是否进行了日志压缩。最后,它打印出结果。