from airflow.hooks.http_hook import HttpHook
def fetch_api():
hook = HttpHook(method='GET', http_conn_id='http_api')
response = hook.run('endpoint/url')
if response.status_code == 200:
return response.content
else:
raise Exception(f'API call failed with HTTP status {response.status_code}')
其中,http_conn_id对应Airflow中的Connection,存储了API的URL和其他参数。run
方法执行API请求并返回响应对象,可以用来检查API响应的状态码和内容。在自己的代码中调用fetch_api
方法即可,对于任何错误的响应都会抛出一个异常,可以在DAG中捕获并处理。
下一篇:Airflow旋转加密密钥