如果需要使用 Airflow Elasticsearch Provider,需使用 HTTP 协议。若需使用 HTTPS 协议,可以使用 Elasticsearch Hook 和 Elasticsearch Sensors。以下是使用 Elasticsearch Hook 的示例代码:
from airflow.hooks.elasticsearch_hook import ElasticsearchHook
es_hook = ElasticsearchHook(http_conn_id='elasticsearch')
# 创建 index
index_config = {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"title": {"type": "text"},
"content": {"type": "text"}
}
}
}
es_hook.create_index(index='my_index', body=index_config)
# 写入数据
data = [
{
'title': 'Airflow is awesome',
'content': 'I love using Airflow for my data pipeline.'
},
{
'title': 'Elasticsearch is awesome',
'content': 'I love using Elasticsearch for my search engine.'
}
]
es_hook.bulk(index='my_index', body=data)
# 查询数据
query = {
'query': {
'match': {
'content': 'Airflow'
}
}
}
result = es_hook.search(index='my_index', body=query)