出现这个问题的原因是Airflow在执行BashOperator的命令时,发现目录不存在而无法自动创建。处理方法有两种:
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
cmd = """
mkdir -p /home/airflow/logs/test && \
echo "Hello World!" > /home/airflow/logs/test/test.txt
"""
with DAG('test_dag', schedule_interval=None, start_date=days_ago(2)) as dag:
task = BashOperator(
task_id='create_directory',
bash_command=cmd,
)
在上面的代码示例中,在BashOperator的命令中增加了创建目录的命令“mkdir -p /home/airflow/logs/test”,并且将“&&”符号用来包含多个命令。