from airflow.models import Variable AWS_ACCESS_KEY_ID = Variable.get("aws_access_key_id") AWS_SECRET_ACCESS_KEY = Variable.get("aws_secret_access_key")
s3_bucket = 's3://example-bucket/dags'
from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2022, 2, 1) }
dag = DAG('example_dag', default_args=default_args)
task1 = BashOperator(task_id='task1', bash_command='echo "Hello World"', dag=dag) task2 = BashOperator(task_id='task2', bash_command='echo "Hello Again World"', dag=dag)
task1 >> task2
确保Airflow中的Dag文件名与S3路径中的Dag文件名相同,并且没有语法错误。
在Dag中增加PythonOperator,调用AWS SDK中的函数S3_hook下载Dag文件,例如:
from airflow.hooks.S3_hook import S3Hook
def download_dag_file(): s3_hook = S3Hook(aws_conn_id='aws_default') s3_hook.download_file(s3_bucket, 'example_dag.py', '/usr/local/airflow/dags/example_dag.py')
download_dag_task = PythonOperator( task_id='download_dag', python_callable=download_dag_file, dag=dag, )
download_dag_task >> task1
通过以上步骤,可以解决Airflow从S3同步Dags导致Dag执行错误的问题。