在Airflow中,可以使用XCom(交流)来在任务之间传递数据。XCom是一个Airflow的功能,它允许任务之间共享数据。
以下是一个示例,展示了如何将一个任务的输出数据作为另一个任务的输入数据传递:
首先,在第一个任务中生成输出数据:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import XCom
def generate_output_data(**context):
output_data = "Hello, World!"
context['ti'].xcom_push(key='output_data', value=output_data)
dag = DAG('data_transfer_example', start_date=datetime(2021, 1, 1))
task1 = PythonOperator(
task_id='generate_output_data',
python_callable=generate_output_data,
provide_context=True,
dag=dag
)
注意上面的generate_output_data
函数使用context['ti'].xcom_push
将输出数据推送到XCom。
接下来,在第二个任务中获取输入数据:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import XCom
def process_input_data(**context):
input_data = context['ti'].xcom_pull(key='output_data')
print(input_data)
dag = DAG('data_transfer_example', start_date=datetime(2021, 1, 1))
task2 = PythonOperator(
task_id='process_input_data',
python_callable=process_input_data,
provide_context=True,
dag=dag
)
上述任务中的process_input_data
函数使用context['ti'].xcom_pull
从XCom中获取输入数据。
最后,将这两个任务连接起来:
task1 >> task2
通过将task1
的输出作为task2
的输入,任务之间的数据传递就完成了。
注意:这只是一个简单的示例,实际上可以在多个任务之间传递更多的数据。