这可能是由于 Airflow 中的一个 bug 导致的。要解决这个问题,可以在 DAG 文件中添加以下代码:
from airflow.utils.state import State from datetime import datetime, timedelta
from airflow.operators.dummy_operator import DummyOperator
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2019, 1, 1), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), 'catchup': False }
dag = DAG( 'my_dag', default_args=default_args, description='My DAG', schedule_interval='0 0 * * *', catchup=False )
begin = DummyOperator(task_id='begin', dag=dag) end = DummyOperator(task_id='end', dag=dag)
def fix_dag(dag): """ 修复 Airflow 的 bug,以便使 prev_execution_date_success 正常工作。 """
def fix_task(task):
task.prev_execution_date_success = task.execution_context.get(
'prev_execution_date_success', None)
for task in dag.tasks:
if task.task_type != 'SubDagOperator':
fix_task(task)
else:
subdag = task.subdag
if subdag is not None:
fix_dag(subdag)
fix_dag(dag)
begin >> end
这个 fix_dag() 函数将执行一个递归遍历 DAG 的操作,以确保 prev_execution_date_success 在每个任务中都被正确设置。这个函数将在 DAG 的构造函数中被调用,以便在 DAG 加载时自动执行。