在Airflow中,可以使用XCom来在任务之间传递数据和返回值。下面是一个示例解决方案,展示了如何使用返回值连接任务。
首先,我们创建两个简单的任务,一个任务生成一个随机数,另一个任务将随机数加倍并输出。
from random import randint
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def generate_random_number():
random_number = randint(1, 100)
return random_number
def multiply_by_two(**context):
random_number = context['task_instance'].xcom_pull(task_ids='generate_random_number')
multiplied_number = random_number * 2
print(f"The multiplied number is: {multiplied_number}")
dag = DAG('example_dag', description='Example DAG', schedule_interval='@once')
generate_operator = PythonOperator(
task_id='generate_random_number',
python_callable=generate_random_number,
dag=dag
)
multiply_operator = PythonOperator(
task_id='multiply_by_two',
python_callable=multiply_by_two,
provide_context=True,
dag=dag
)
generate_operator >> multiply_operator
在这个示例中,我们首先定义了两个任务generate_random_number
和multiply_by_two
。generate_random_number
任务生成一个随机数,并将其作为返回值返回。multiply_by_two
任务使用XCom来获取上一个任务的返回值,并将其加倍。
然后,我们创建了一个DAG对象,并将两个任务添加到DAG中。使用provide_context=True
来将上下文传递给任务,以便我们可以在multiply_by_two
任务中访问上一个任务的返回值。
最后,我们使用>>
运算符将两个任务连接起来,表示generate_random_number
任务的返回值将作为输入传递给multiply_by_two
任务。
这样,当DAG运行时,首先会执行generate_random_number
任务,然后将返回的随机数传递给multiply_by_two
任务,最后multiply_by_two
任务会将加倍后的结果输出。