检查Airflow配置文件中的默认队列,如果设置为default
,则可以考虑将其更改为其他队列名解决。如果更改默认队列不起作用,可以尝试手动清除Airflow的任务队列并重新启动Airflow服务。另外,还可以尝试使用Poke()
方法代替FileSensor()
方法来检测文件是否存在,如下所示:
from airflow.operators.sensors import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
class MyFileSensor(BaseSensorOperator):
@apply_defaults
def __init__(
self,
filepath,
*args, **kwargs):
super(MyFileSensor, self).__init__(*args, **kwargs)
self.filepath = filepath
def poke(self, context):
self.log.info('Poking for file %s', self.filepath)
full_path = os.path.join(self.filepath)
if os.path.isfile(full_path):
self.log.info('Success! File %s exists.', self.filepath)
return True
else:
self.log.warn('File %s does not exist yet.', self.filepath)
return False
使用方法:
poll_for_file = MyFileSensor(
task_id='poll_for_file',
poke_interval=10,
filepath='/path/to/my/file',
dag=my_dag)