检查上游任务的定义是否正确,确保上游任务成功执行后,状态正确地传递给下游任务。也可以尝试重新启动任务或清除任务状态来解决此问题。
代码示例:
def my_upstream_task(): # 执行代码 return result
def my_downstream_task(**context): upstream_result = context['ti'].xcom_pull(task_ids='my_upstream_task') # 执行代码使用上游任务的结果
t1 = PythonOperator( task_id='my_upstream_task', python_callable=my_upstream_task, dag=my_dag ) t2 = PythonOperator( task_id='my_downstream_task', python_callable=my_downstream_task, provide_context=True, dag=my_dag )
t2.set_upstream(t1)
airflow clear my_dag --only_failed airflow backfill my_dag -s 20221201 -e 20221201
airflow clear my_dag my_task -s 20221201 -e 20221201
airflow clear my_dag -s 20221201 -e 20221201