在Airflow中,有三种方式可以实现工作流之间的分支或多路径逻辑:分支、单独的DAG和任务内的分支。这些方法每个都有自己的优势和劣势。
对于简单的工作流,建议使用分支操作,因为这种方法比较容易实现和维护。下面是使用分支操作的代码示例:
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 8, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('branching_dag', default_args=default_args, catchup=False)
branching_task = BranchPythonOperator(
task_id='branching',
python_callable=lambda: 'branch_a' if True else 'branch_b',
dag=dag,
)
branch_a_task = DummyOperator(
task_id='branch_a',
dag=dag,
)
branch_b_task = DummyOperator(
task_id='branch_b',
dag=dag,
)
branching_task >> [branch_a_task, branch_b_task]
如果分支条件比较复杂,或者想要更好地组织工作流,可以使用单独的DAG。这样可以将相关的任务分组,并将它们放在一个文件中,使代码易于管理。下面是使用单独的DAG的代码示例:
# In dag_one.py
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 8, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('dag_one', default_args=default_args, catchup=False)
task_a = BashOperator(
task_id='task_a',
bash_command='echo "Hello from task_a"',
dag=dag,
)
task_b = BashOperator(
task_id='task_b',
bash_command='echo "Hello from task_b"',
dag=dag,
)
task_a >> task_b
# In dag_two.py
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args
下一篇:Airflow:优先处理Dags