在Airflow Python中使用yield语句可以实现生成器函数,可用于生成任务实例,例如:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def generate_task_instances():
for i in range(5):
yield {
'task_id': f'task_{i}',
'param': i
}
def process_task(**kwargs):
task_id = kwargs['task_instance'].task_id
param = kwargs['task_instance'].xcom_pull(task_ids=None, key='param')
print(f"Processing task {task_id} with param {param}")
with DAG('yield_example', schedule_interval=None) as dag:
generate_task_instances_task = PythonOperator(
task_id='generate_task_instances',
python_callable=generate_task_instances
)
process_task_task = PythonOperator(
task_id='process_task',
python_callable=process_task,
provide_context=True
)
generate_task_instances_task >> process_task_task
在上面的示例中,generate_task_instances
函数是一个生成器函数,通过yield语句返回包含任务实例信息的字典。每次调用生成器函数时,它会生成一个新的任务实例字典。
process_task
函数是一个处理任务实例的函数,通过kwargs
参数可以访问任务实例的上下文信息,例如任务ID和参数值。在这个示例中,它通过xcom_pull
方法获取generate_task_instances_task
任务的返回值(即任务实例字典中的param键),并进行处理。
最后,使用PythonOperator
定义两个任务,将它们连接在一起,并使用provide_context=True
参数使任务函数能够接收任务实例的上下文信息。
这样,当DAG运行时,generate_task_instances
函数将会生成5个不同的任务实例,每个实例都会被传递给process_task
函数进行处理。