要解决Airflow中HdfsSensor运算符无法正常工作的问题,你可以尝试以下几个步骤:
pip install hdfs3
[hdfs]
# hdfs的连接信息
host =
port =
from airflow import DAG
from airflow.contrib.sensors.hdfs_sensor import HdfsSensor
from datetime import datetime
with DAG('hdfs_sensor_example', start_date=datetime(2022, 1, 1)) as dag:
hdfs_sensor = HdfsSensor(
task_id='hdfs_sensor_task',
filepath='/path/to/hdfs/file',
poke_interval=60, # 每60秒检查一次HDFS文件是否存在
timeout=600, # 超时时间为600秒
)
在这个示例中,HdfsSensor运算符将会每60秒检查一次HDFS文件的存在性,超时时间为600秒。你可以根据实际需求进行调整。
如果你按照上述步骤进行设置,并且仍然遇到问题,你可以尝试查看Airflow的日志文件,以便进一步排查错误。