在Airflow 2.x中,当任务之间存在循环依赖时,可能会出现任务不能识别的情况。这通常是因为任务名称与DAG ID相同,导致Airflow无法区分哪个任务属于哪个DAG。
解决这个问题的方法是使用op_args和op_kwargs来定义每个任务,并为每个任务分配唯一的任务ID。例如:
from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime
dag = DAG(dag_id='my_dag', start_date=datetime.now())
def task1(**context): print('This is task1')
def task2(**context): print('This is task2')
def task3(**context): print('This is task3')
op1 = PythonOperator( task_id='op1', python_callable=task1, op_kwargs={}, dag=dag )
op2 = PythonOperator( task_id='op2', python_callable=task2, op_kwargs={}, dag=dag )
op3 = PythonOperator( task_id='op3', python_callable=task3, op_kwargs={}, dag=dag )
op1 >> op2 op2 >> op3 op3 >> op1