在Airflow代码中使用set时,需要注意set不是JSON可序列化的,需要将其转换为列表或元组。例如,在DAG的PythonOperator任务中使用set时,可能会遇到这个问题。下面是一个示例代码:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def my_task():
my_set = set([1, 2, 3])
# do something with my_set
dag = DAG('my_dag', start_date=datetime.now())
t1 = PythonOperator(
task_id='my_task',
provide_context=False,
python_callable=my_task,
dag=dag
)
t1
在这个例子中,my_set是一个set,在my_task()中处理它。但是,如果我们直接这样运行代码,会得到一个“Set不是JSON可序列化的”错误。我们需要将其转换为列表或元组,如下所示:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def my_task():
my_set = set([1, 2, 3])
my_list = list(my_set) # 将set转换为list
# do something with my_list
dag = DAG('my_dag', start_date=datetime.now())
t1 = PythonOperator(
task_id='my_task',
provide_context=False,
python_callable=my_task,
dag=dag
)
t1
现在,我们已经将my_set转换为my_list,这个问题就可以被解决了。
上一篇:Airflow任务因段错误失败