import boto3
import requests
from requests_aws4auth import AWS4Auth
import json
# 填写Elasticsearch的Endpoint
endpoint = 'your-es-endpoint'
# 填写Elasticsearch的Region
region = 'your-region'
# 填写Elasticsearch的访问账户和密钥
access_key = 'your-access-key'
secret_key = 'your-secret-key'
# 认证
service = 'es'
es_auth = AWS4Auth(access_key, secret_key, region, service)
# 索引名称
index = 'your-index-name'
doc_type = 'your-doc-type'
# 简单的查询
query = {
"query": {
"match_all": {}
}
}
# 发送查询请求
url = 'https://' + endpoint + '/' + index + '/' + doc_type + '/_search'
headers = { "Content-Type": "application/json" }
response = requests.post(url, auth=es_auth, headers=headers, data=json.dumps(query))
print(response.json())
from awsglue.utils import getResolvedOptions
import boto3
import sys
import json
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
# Glue和Elasticsearch在同一VPC中,可以使用内网访问
es_endpoint = 'https://your-es-endpoint'
region = 'your-region'
index = 'your-index-name'
# 填写认证信息
access_key = 'your-access-key'
secret_key = 'your-secret-key'
# 认证
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
# 发送查询请求
query_url = es_endpoint + '/' + index + '/_search?'
payload = {
'query': {
'match_all': {}
},
'size': 10
}
headers = {'Content-Type': 'application/json'}
response = requests.get(query_url, headers=headers, auth=awsauth, data=json.dumps(payload))
result = response.content.decode('utf-8')
print(result)
注意:上面两段代码需要根据不同的Elasticsearch版本和ES集群角色进行修改。
通过以上步骤,即可在AWS Glue的Pythonshell Job中成功连接到Elasticsearch集群。
上一篇:AWSGluePythonFileNotFoundError:[Errno2]Nosuchfileordirector(AWSGluePython文件未找到错误:[Errno2]没有这样的文件或目录)