在项目的pom.xml文件中添加以下依赖项:
com.amazonaws
aws-java-sdk
1.11.886
将ElasticsearchIO版本升级到2.18.0或以上版本。
org.apache.beam
beam-sdks-java-io-elasticsearch
2.28.0
由于OpenSearch使用的是AWS签名V4,因此需要将ElasticsearchIO配置为使用AWS4签名。
ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.setSignerOverride("AWS4SignerType");
ElasticsearchIO.ConnectionConfiguration connectionConfiguration =
ElasticsearchIO.ConnectionConfiguration.create(endPoint, indexName)
.withUsernameAndPassword(username, password)
.withClientConfiguration(clientConfiguration);
提供AWS访问密钥,以便在请求时进行身份验证。
AmazonHttpClient httpClient = new AmazonHttpClient(new ClientConfiguration());
AWS4Signer signer = new AWS4Signer();
signer.setServiceName("es");
signer.setRegionName("us-east-1");
Request> request = new DefaultRequest<>(new ElasticsearchIORequestSigner());
request.setEndpoint(URI.create("AWS Opensearch URL"));
request.setHttpMethod(HttpMethodName.GET);
request.setHeaders(new HashMap<>());
request.getContent();
Map headers = request.getHeaders();
headers.put("Host", "");
headers.put("Content-Type", "application/json");
signer.sign(request, new AWSCredentials() {
@Override
public String getAWSAccessKeyId() {
return "";
}
@Override
public String getAWSSecretKey() {
return "";
}
});
Response httpResponse = httpClient.execute