在Airflow中,动态任务是通过返回Python字典来生成任务的。如果任务返回列表而不是字典,则会出现问题。以下是
确认任务返回一个字典而不是一个列表。
如果任务返回一个列表,则可以将其转换为字典。例如:
def my_dynamic_task():
my_list = [1, 2, 3]
my_dict = {}
for i, item in enumerate(my_list):
my_dict[i] = item
return my_dict
from airflow.operators.python_operator import PythonOperator
my_list = [1, 2, 3]
def my_task_callable():
for item in my_list:
print(item)
my_task = PythonOperator(
task_id='my_task',
python_callable=my_task_callable,
dag=dag,
)
在上面的代码中,我们手动定义了一个PythonOperator,并将其设置为运行一个可调用函数。该函数可以访问任务所需的任何数据。