在使用Airflow的ExternalTaskSensor时,如果任务没有被触发,可能有以下几个原因:
确保依赖任务已经成功运行并且处于“success”状态。ExternalTaskSensor需要等待依赖任务完成后才会触发。
检查依赖任务的task_id是否正确。确保task_id与依赖任务的task_id完全匹配。
检查依赖任务所在的DAG的dag_id是否正确。确保dag_id与依赖任务所在的DAG的dag_id完全匹配。
检查依赖任务所在的DAG是否已经被加载到Airflow中。可以使用airflow dags list
命令检查DAG的状态。
以下是一个示例代码,演示如何正确使用ExternalTaskSensor:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor
from datetime import datetime
# 定义DAG
dag = DAG('my_dag', description='My DAG', schedule_interval=None, start_date=datetime(2022, 1, 1))
# 定义依赖任务
dependency_task = DummyOperator(task_id='dependency_task', dag=dag)
# 定义需要等待的任务
sensor_task = ExternalTaskSensor(
task_id='sensor_task',
external_dag_id='my_dag',
external_task_id='dependency_task',
dag=dag
)
# 定义后续任务
final_task = DummyOperator(task_id='final_task', dag=dag)
# 设置任务的依赖关系
dependency_task >> sensor_task >> final_task
在这个示例中,sensor_task
是一个ExternalTaskSensor,它会等待dependency_task
任务完成后才会触发。确保sensor_task
的external_dag_id
和external_task_id
与dependency_task
的dag_id和task_id匹配。
记得将以上代码保存为一个Python文件,然后通过airflow dags unpause
命令启用DAG,使用airflow scheduler
命令启动调度器。
这样,当dependency_task
任务成功完成后,sensor_task
任务就会被触发执行。