问题是由于Airflow 1.10.15版本中的Jinja模板渲染器更改所致。要解决此问题,可以在PythonBranchOperator中使用软件包内置的MultipleBranchPythonOperator类而不是自己创建自定义操作器,如下所示:
from airflow.operators.python_operator import BranchPythonOperator, PythonOperator from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta from airflow import DAG
def return_branch(**kwargs): if kwargs['ds'] == '2022-01-01': return "run_this_first" else: return "run_this_last"
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2021, 1, 1), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=1), }
dag = DAG( 'branch_python_operator_test', default_args=default_args, catchup=False, schedule_interval="@daily")
branching = BranchPythonOperator( task_id='branching', dag=dag, provide_context=True, python_callable=return_branch)
run_this_first = BashOperator( task_id='run_this_first', bash_command='echo 1', dag=dag)
run_this_last = BashOperator( task_id='run_this_last', bash_command='echo 2', dag=dag)
branching >> [run_this_first, run_this_last]
在此示例中,我们使用BranchPythonOperator返回要执行的任务名称,然后使用>>将此任务链接到两个BashOperator。