一种可能的解决方法是将外部任务感应器的poke_interval参数设置为一个较小的值,例如1秒,而不是默认的30秒。另外,还可以在任务依赖关系中添加其他限制条件,以减少poke的次数。
示例代码:
from airflow import DAG from airflow.sensors.external_task_sensor import ExternalTaskSensor from datetime import datetime, timedelta
dag = DAG( dag_id='example_dag', start_date=datetime(2021, 1, 1), schedule_interval=None, )
sensor = ExternalTaskSensor( task_id='sensor_task', external_dag_id='example_external_dag', external_task_id='example_task', poke_interval=1, # 设置poke_interval为1秒 dag=dag, )
task1 = BashOperator( task_id='task1', bash_command='echo "Hello World"', dag=dag, )
task1 >> sensor # 添加任务依赖关系