在Airflow中,可以使用ExternalTaskSensor来等待另一个任务的完成。如果要手动触发ExternalTaskSensor,可以通过以下方法:
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
# 定义DAG的参数
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
# 创建DAG
dag = DAG('manual_trigger_dag',
default_args=default_args,
schedule_interval=None)
# 创建ExternalTaskSensor依赖的任务
task1 = ExternalTaskSensor(
task_id='wait_for_task',
external_dag_id='your_dag_id',
external_task_id='your_task_id',
dag=dag
)
# 创建手动触发的任务
task2 = DummyOperator(
task_id='manual_trigger_task',
dag=dag
)
# 设置任务的依赖关系
task1 >> task2
在上面的代码中,我们创建了一个名为manual_trigger_dag
的新DAG。其中包含了一个ExternalTaskSensor(wait_for_task
)和一个DummyOperator(manual_trigger_task
)。ExternalTaskSensor会等待your_dag_id
中的your_task_id
任务完成后才会继续执行。DummyOperator是一个空操作符,用于手动触发的任务。
运行这个新的DAG,可以在Airflow UI中手动触发DAG。找到manual_trigger_dag
并点击"Trigger DAG"按钮即可触发手动触发的任务执行。
这样,当手动触发任务执行时,ExternalTaskSensor会等待your_dag_id
中的your_task_id
任务完成后才会继续执行。