在Airflow中连接到AWS的示例代码如下:
首先,您需要在Airflow中创建一个连接来连接到AWS。可以通过以下方式执行:
在Airflow的任务代码中,您可以使用以下代码来获取AWS连接并执行AWS操作:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.base_hook import BaseHook
def my_task():
# 获取AWS连接
aws_conn = BaseHook.get_connection("aws_conn")
# 执行AWS操作,例如:
import boto3
s3 = boto3.client('s3',
aws_access_key_id=aws_conn.login,
aws_secret_access_key=aws_conn.password,
region_name=aws_conn.extra_dejson.get('region_name', 'us-west-2'))
# 其他AWS操作...
dag = DAG('my_dag', schedule_interval=None, default_args={})
task = PythonOperator(task_id='my_task', python_callable=my_task, dag=dag)
在my_task
函数中,我们首先使用BaseHook.get_connection
方法来获取之前创建的AWS连接。然后,我们可以使用连接的登录和密码属性来执行AWS操作,例如使用boto3
库执行S3操作。
请注意,您需要根据您的具体情况修改连接的唯一标识符、AWS连接参数和AWS操作代码。