在Airflow(MWAA)中,任务进入失败状态而从未运行可能是由于以下几个原因引起的:
# 例如,如果任务B依赖于任务A:
dag = DAG('my_dag', default_args=default_args, schedule_interval='@daily')
task_a = BashOperator(
task_id='task_a',
bash_command='echo "Task A"',
dag=dag,
)
task_b = BashOperator(
task_id='task_b',
bash_command='echo "Task B"',
dag=dag,
)
task_b.set_upstream(task_a) # 设置任务B依赖于任务A
# 例如,如果任务需要访问某个目录下的文件:
dag = DAG('my_dag', default_args=default_args, schedule_interval='@daily')
task_a = BashOperator(
task_id='task_a',
bash_command='cat /path/to/file.txt', # 确保文件存在
dag=dag,
)
# 例如,增加任务的CPU和内存配置:
dag = DAG('my_dag', default_args=default_args, schedule_interval='@daily')
task_a = BashOperator(
task_id='task_a',
bash_command='echo "Task A"',
dag=dag,
# 增加任务的CPU和内存配置
resources={'cpus': 2, 'memory': '2G'},
)
# 例如,检查任务的bash命令是否正确:
dag = DAG('my_dag', default_args=default_args, schedule_interval='@daily')
task_a = BashOperator(
task_id='task_a',
bash_command='echo "Task A"', # 确保命令正确
dag=dag,
)
确保以上可能导致任务进入失败状态而从未运行的问题得到解决后,重新部署DAG并触发任务的运行。