dag = DAG('my_dag', default_args=default_args, schedule_interval='@daily',
dagrun_timeout=timedelta(minutes=60), catchup=False)
filepath = '/usr/local/airflow/dags/my_dag.py'
with open(filepath, 'r') as f:
code = compile(f.read(), filepath, 'exec')
exec(code, globals(), locals())
检查DAG文件名是否符合规范:DAG文件名应该以.py结尾,并避免使用空格和特殊字符。
检查DAG文件是否存在语法错误:在命令行中使用python
检查文件是否能够被正常导入。
检查Airflow的日志:使用airflow logs my_dag
命令查看DAG是否有任何错误信息。
重启Airflow:尝试重启Airflow并重新加载DAG,使用命令airflow resetdb && airflow initdb && airflow scheduler
。
检查Airflow的权限设置:确保DAG文件夹和DAG文件的权限被正确设置,可以使用命令sudo chmod -R 777 airflow
修改权限。