- 首先需要在 DAG(有向无环图)文件中定义一个 PythonOperator 任务。此任务应包含要处理的文件路径列表:
# Import the necessary modules
import os
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
# Define the DAG details
dag = DAG(
'file_processing_task',
default_args=args,
schedule_interval=None
)
# Define the list of file paths to be processed
file_list = ['/path/to/file1', '/path/to/file2', '/path/to/file3']
def process_files(file_paths):
# Loop through the file paths list and process each file
for file_path in file_paths:
# Add your file processing code here
pass
# Create a PythonOperator to process the files
process_file_operator = PythonOperator(
task_id='file_processing_task',
python_callable=process_files,
op_args=[file_list],
dag=dag
)
- 接下来,在另一个 PythonOperator 任务中可以获取和处理处理过文件的信息。例如,您可以计算文件夹中包含的文件数量或统计各个文件的行数:
# Define a function to count the number of files in a directory
def count_files(directory):
# Use the os module to count the number of files in the directory
file_count = len(os.listdir(directory))
# Return the file count as a string
return str(file_count)
# Define a PythonOperator to get the file count from the previous task
get_file_count_operator = PythonOperator(
task_id='get_file_count_task',
python_callable=count_files,
op_args=['/path/to/file/folder'],
dag=dag
)
# Define a function to count the number of lines in a file
def count_lines(file_path):
# Use the Python built-in file open function to read the file
with open(file_path, 'r') as file:
# Use the built-in len() function to count the number of lines in the file
line_count = len(file.readlines())
# Return the line count as a string
return str(line