Airflow 的 worker 是各自运行在独立的进程中的,因此它们是隔离的。每个 worker 执行任务时都会在本地创建临时文件,并且任务执行完毕后自动清理这些文件。
下面是一个简单的 Python 代码示例,展示了如何在 Airflow DAG 中使用 BashOperator 来创建临时文件并将其传递到另一个任务中:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 1, 1)
}
with DAG('example_dag', default_args=default_args, schedule_interval=None) as dag:
# 创建临时文件
create_temp_file = BashOperator(
task_id='create_temp_file',
bash_command='echo "Hello, World" > /tmp/hello.txt',
)
# 在不同的任务中使用临时文件
use_temp_file = BashOperator(
task_id='use_temp_file',
bash_command='cat /tmp/hello.txt',
dag=dag,
)
create_temp_file >> use_temp_file
在上面的 DAG 中,先创建了一个名为 "create_temp_file" 的 BashOperator 任务来创建临时文件 "/tmp/hello.txt",然后将其连接到另一个 BashOperator 任务 "use_temp_file",该任务将查看该文件的内容并将其打印到输出中。由于 Airflow 的 worker 是隔离的,因此这两个任务在运行时不会共享文件系统。