首先,需要明确Airflow DAG中的任务是如何被调度的。对于重复性任务,Airflow会定期检查它们是否可以开始运行。这个时间间隔由DAG的"schedule_interval"属性控制。
如果任务的"start_date"属性设置为以前的日期,Airflow会标记该任务为"up_for_retry"状态并等待以后的调度运行。这是因为Airflow只会尝试在任务的'start_date”之后执行该任务。因此,任务将被视为未能启动,直到下一个调度间隔到来。这为指定任务之间的依赖关系提供了灵活性。
要解决这个问题,可以通过一些方法将任务的"start_date"属性设置为当前日期,或者稍后的日期。其中一种方法是使用Python的datetime模块:
from datetime import datetime, timedelta
start_date = datetime.today() # or datetime(2022, 5, 1)
dag = DAG(
dag_id='my_dag_id',
start_date=start_date,
schedule_interval='@daily'
)
如果需要在DAG文件中定义多个任务,可以使用变量来设置它们的"start_date"属性,以便可以为每个任务设置不同的日期。例如:
from datetime import datetime, timedelta
start_date_first_task = datetime.today() - timedelta(days=1)
start_date_second_task = datetime.today() - timedelta(hours=12)
dag = DAG(
dag_id='my_dag_id',
default_args={
'owner': 'airflow',
'start_date': datetime.today()
},
schedule_interval='@daily'
)
task1 = BashOperator(
task_id='task1',
bash_command='echo "Hello"',
start_date=start_date_first_task,
dag=dag
)
task2 = BashOperator(
task_id='task2',
bash_command='echo "World"',
start_date=start_date_second_task,
dag=dag
)
task2.set_upstream(task1)
这个例子中,两个任务分别被设置为前一天和前12小时开始。注意,在DAG的default_args中可以设置默认的start_date,这样在定义任务时就无需显式指定。