在Airflow中,可以通过使用ExternalTaskSensor来实现跨DAG的依赖关系。ExternalTaskSensor可以用来等待另一个DAG的任务完成,然后再启动当前DAG的任务。下面是一个示例:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from datetime import datetime, timedelta
# 定义DAG的默认参数
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
# 定义DAG
dag = DAG(
'DAG_A',
default_args=default_args,
schedule_interval=timedelta(days=1)
)
# 定义DAG中的任务
task_1 = BashOperator(
task_id='task_1',
bash_command='echo "This is Task 1"',
dag=dag
)
task_2 = ExternalTaskSensor(
task_id='task_2',
external_dag_id='DAG_B',
external_task_id='task_1',
trigger_rule='all_done',
dag=dag
)
task_3 = BashOperator(
task_id='task_3',
bash_command='echo "This is Task 3"',
dag=dag
)
# 设置任务之间的关系
task_1 >> task_2 >> task_3
# 定义DAG
dag = DAG(
'DAG_B',
default_args=default_args,
schedule_interval=timedelta(days=1)
)
# 定义DAG中的任务
task_1 = BashOperator(
task_id='task_1',
bash_command='echo "This is Task 1 in DAG B"',
dag=dag
)
# 返回DAG对象
return dag
在上面的代码中,定义了两个DAG,即DAG_A和DAG_B。DAG_A中包含三个任务,其中task_2是一个ExternalTaskSensor,它等待DAG_B中