是的,AWS MSK支持Kafka的KRaft模式。
以下是使用AWS SDK for Java v2实现Kafka KRaft模式在AWS MSK上交互的示例代码:
import software.amazon.awssdk.services.kafka.KafkaClient;
import software.amazon.awssdk.services.kafka.model.*;
public class KafkaClientApp {
public static void main(String[] args) {
String bootstrapBrokers = "BROKER_1:9092,BROKER_2:9092";
String topicName = "my-kafka-topic";
KafkaClient kafkaClient = KafkaClient.builder().build();
// 创建Kafka主题
CreateTopicRequest createTopicRequest = CreateTopicRequest.builder()
.name(topicName)
.numberOfPartitions(3)
.replicationFactor(2)
.build();
CreateTopicResponse createTopicResponse = kafkaClient.createTopic(createTopicRequest);
// 获取Kafka主题的ARNS
DescribeClusterRequest describeClusterRequest = DescribeClusterRequest.builder()
.clusterArn(clusterArn)
.build();
DescribeClusterResponse describeClusterResponse = kafkaClient.describeCluster(describeClusterRequest);
String clusterArn = describeClusterResponse.clusterInfo().clusterArn();
String topicArn = String.format("%s/topics/%s", clusterArn, topicName);
// 创建Kafka KRaft集群
CreateClusterRequest createClusterRequest = CreateClusterRequest.builder()
.brokerNodeGroupInfo(nodeGroupInfo)
.clusterName("my-kafka-krat-cluster")
.kafkaVersion("2.7.0")
.numberOfBrokerNodes(2)
.enhancedMonitoring(EnhancedMonitoring.PER_BROKER)
.openMonitoring(openMonitoringInfo)
.encryptionInfo(encryptionInfo)
.clientAuthentication(clientAuthentication)
.build();
CreateClusterResponse createClusterResponse = kafkaClient.createCluster(createClusterRequest);
String clusterArn = createClusterResponse.clusterArn();
// 更新Kafka主题以在KRaft模式下运行
UpdateBrokerStorageRequest updateBrokerStorageRequest = UpdateBrokerStorageRequest.builder()
.clusterArn(clusterArn)
.targetBrokerEBSVolumeInfo(targetBrokerEBSVolumes)
.build();
UpdateBrokerStorageResponse updateBrokerStorageResponse = kafkaClient.updateBrokerStorage(updateBrokerStorageRequest);
UpdateBrokerTypeRequest updateBrokerTypeRequest = UpdateBrokerTypeRequest.builder()
.clusterArn(clusterArn)
.brokerIds(Arrays.asList("0", "1"))
.currentVersion(kafkaVersion)
.targetInstanceType(targetInstanceType)
.engineVersion(engineVersion)
.build();
UpdateBrokerTypeResponse updateBrokerTypeResponse = kafkaClient.updateBrokerType(updateBrokerTypeRequest);
UpdateClusterKafkaVersionRequest updateClusterKafkaVersionRequest = UpdateClusterKafkaVersionRequest.builder()
.clusterArn(clusterArn)
.configurationInfo(kafkaConfigurationInfo)
.kafkaVersion(kafkaVersion)
.build();
UpdateClusterKafkaVersionResponse updateClusterKafkaVersionResponse = kafkaClient.updateClusterKafkaVersion(updateClusterKafkaVersionRequest);
UpdateClusterConfigurationRequest updateClusterConfigurationRequest = UpdateClusterConfigurationRequest.builder()
.clusterArn(clusterArn)
.configurationInfo(kafkaConfigurationInfo)
.build();
UpdateClusterConfigurationResponse updateClusterConfigurationResponse = kafkaClient.updateClusterConfiguration(updateClusterConfigurationRequest);
// 删除Kafka集群
DeleteClusterRequest deleteClusterRequest = DeleteClusterRequest.builder()
.clusterArn(clusterArn)
.currentVersion(kafkaVersion)
.build();
DeleteClusterResponse deleteClusterResponse = kafkaClient.deleteCluster(deleteCluster