任务可能被跳过的原因是通过设置的依赖性链,前一个任务并未成功地完成或未完成。我们可以通过检查前一个任务日志文件或使用以下代码示例来解决这个问题:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'start_date': datetime(2021, 1, 1),
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('example_dag', default_args=default_args, schedule_interval=timedelta(days=1))
t1 = BashOperator(
task_id='task_1',
bash_command='echo 1',
dag=dag,
)
t2 = BashOperator(
task_id='task_2',
bash_command='echo 2',
dag=dag,
)
t1 >> t2
这段代码将创建一个有效的依赖性链,其中任务2依赖于任务1的成功。