首先,请确保您要使用的文件确实存在于指定的路径中。然后,您需要在DAG文件中使用完整的本地文件路径,而不仅仅是文件名。比如,如果您的文件名为example.txt,它被存储在/home/airflow/目录中,则应该使用/home/airflow/example.txt而不是example.txt。
以下是一个使用完整路径的示例代码:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'retries': 1
}
with DAG('example_dag', default_args=default_args, schedule_interval='@daily') as dag:
task1 = BashOperator(
task_id='example_task',
bash_command='echo "Hello World" > /home/airflow/example.txt',
)
task2 = BashOperator(
task_id='example_task_2',
bash_command='cat /home/airflow/example.txt',
)
task1 >> task2
在这种情况下,任务1将在/home/airflow目录中创建一个名为example.txt的文件,然后任务2将读取并显示其内容。