可以使用以下代码示例解决此问题:
from airflow import DAG
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
dag = DAG(
'file_sensor_dag',
default_args=default_args,
description='File Sensor Example DAG',
schedule_interval=None,
)
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/path/to/your/file',
poke_interval=5,
dag=dag,
)
echo_command = BashOperator(
task_id='echo_command',
bash_command='echo "Hello World!"',
dag=dag,
)
wait_for_file >> echo_command
在上面的代码中,我们使用了 FileSensor Operator 来检测文件是否可用。poke_interval
参数定义了检测文件可用的时间间隔。如果文件是可用的,那么我们使用 BashOperator 来运行命令行命令。
需要注意的是,如果您使用的是 Airflow 1.10.x 版本,那么在导入 FileSensor 时需要添加 from airflow.contrib.operators import FileSensor
。但是,在 Airflow 2.x 版本中,推荐使用 from airflow.sensors.filesystem import FileSensor
。