首先,需要检查你的 AWS MSK 集群上是否启用了日志清理器。如果启用了,则可以使用以下代码示例来检查日志清理器是否记录了任何内容:
import org.apache.kafka.clients.admin.*;
import java.util.*;
public class KafkaLogCleanerChecker {
public static void main(String[] args) throws Exception {
// 设置 Kafka 集群的连接信息
String bootstrapServers = "your-bootstrap-servers";
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 创建 AdminClient 对象
AdminClient adminClient = AdminClient.create(props);
// 获取日志清理器配置信息
ConfigResource logCleanerResource = new ConfigResource(Type.BROKER, "your-broker-id");
DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Collections.singleton(logCleanerResource));
Config config = describeConfigsResult.all().get().get(logCleanerResource);
// 检查日志清理器是否记录了任何内容
String logCleanerEnable = config.get("log.cleaner.enable").value();
if (logCleanerEnable != null && logCleanerEnable.equals("true")) {
String logCleanerDedupeBufferSize = config.get("log.cleaner.dedupe.buffer.size").value();
System.out.println("日志清理器已启用,并且日志大小为 " + logCleanerDedupeBufferSize);
} else {
System.out.println("日志清理器未启用");
}
// 关闭 AdminClient 对象
adminClient.close();
}
}
在代码中,我们使用 AdminClient 类来获取 Kafka 集群的配置信息,然后检查日志清理器是否启用并记录了任何内容。如果日志清理器已启用,则打印日志大小,否则打印“日志清理器未启用”的消息。你需要将 your-bootstrap-servers 和 your-broker-id 用实际值替