在Airflow的任务中创建的临时文件只存在于该任务的执行期间,而不会跨任务存在。因此,每个任务都应该有自己独立的临时文件。以下是一个示例:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import tempfile
def create_temp_file():
# create a temp file
temp_file = tempfile.NamedTemporaryFile(delete=False)
temp_file.write(b"Hello World!")
temp_file.close()
print(f"Created temp file: {temp_file.name}")
def use_temp_file():
# read the contents of temp file
with open("temp_file.txt", "r") as f:
contents = f.read()
print(f"Contents of temp file: {contents}")
dag = DAG(dag_id="temp_files_dag")
create_temp_file_task = PythonOperator(
task_id="create_temp_file",
python_callable=create_temp_file,
dag=dag
)
use_temp_file_task = PythonOperator(
task_id="use_temp_file",
python_callable=use_temp_file,
dag=dag
)
create_temp_file_task >> use_temp_file_task
在上面的示例中,第一个任务 create_temp_file
用于创建一个临时文件,而第二个任务 use_temp_file
用于读取该临时文件的内容。 create_temp_file
在执行期间创建临时文件,并在任务完成时自动删除。虽然 use_temp_file
同样需要访问临时文件,但它重新打开了一个新的文件,并不会访问 create_temp_file
创建的临时文件。因此,每个任务都可以使用独立的临时文件,而不会受到其他任务的干扰。