在Airflow中,可以使用ExternalTaskSensor来等待另一个任务的完成。如果要手动触发ExternalTaskSensor,可以通过以下方法:
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
# 定义DAG的参数
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2021, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}
# 创建DAG
dag = DAG('manual_trigger_dag', 
          default_args=default_args, 
          schedule_interval=None)
# 创建ExternalTaskSensor依赖的任务
task1 = ExternalTaskSensor(
    task_id='wait_for_task',
    external_dag_id='your_dag_id',
    external_task_id='your_task_id',
    dag=dag
)
# 创建手动触发的任务
task2 = DummyOperator(
    task_id='manual_trigger_task',
    dag=dag
)
# 设置任务的依赖关系
task1 >> task2
在上面的代码中,我们创建了一个名为manual_trigger_dag的新DAG。其中包含了一个ExternalTaskSensor(wait_for_task)和一个DummyOperator(manual_trigger_task)。ExternalTaskSensor会等待your_dag_id中的your_task_id任务完成后才会继续执行。DummyOperator是一个空操作符,用于手动触发的任务。
运行这个新的DAG,可以在Airflow UI中手动触发DAG。找到manual_trigger_dag并点击"Trigger DAG"按钮即可触发手动触发的任务执行。
这样,当手动触发任务执行时,ExternalTaskSensor会等待your_dag_id中的your_task_id任务完成后才会继续执行。