在Airflow中,可以使用BranchPythonOperator实现条件任务执行。该操作符可以通过返回任务ID来控制执行哪个任务。
以下是代码示例:
from airflow import DAG
from airflow.operators.python_operator import BranchPythonOperator
from datetime import datetime
def branch_func():
if condition:
return 'task_a'
else:
return 'task_b'
dag = DAG('conditional_dag', schedule_interval='@daily')
branch = BranchPythonOperator(
task_id='branch_task',
python_callable=branch_func,
dag=dag,
)
task_a = DummyOperator(
task_id='task_a',
dag=dag
)
task_b = DummyOperator(
task_id='task_b',
dag=dag
)
branch >> [task_a, task_b]
在上面的代码中,根据条件判断选择要执行的任务。如果condition为True,则选择执行task_a;反之,则选择执行task_b。
注意,branch_func中的条件可以是任何适用于Python可调用函数的逻辑判断语句。