这是因为Airflow无法将PythonOperator等非原生数据类型序列化为JSON格式存储。因此,解决这个问题的一种方法是使用PostgresHook在数据库中将Python对象存储为字节码。请参考以下代码示例:
from airflow.contrib.hooks.postgres_hook import PostgresHook
def save_task_instance_to_db(ti): pg_hook = PostgresHook(postgres_conn_id='your_postgres_connection_id') # serialize the PythonOperator instance as a byte string serialized_op_instance = pickle.dumps(ti.task) # insert the serialized instance into the database pg_hook.run("INSERT INTO your_table (operator_instance) VALUES (%s)", parameters=(serialized_op_instance, ))
上一篇:Airflow:Kubernetes:权限拒绝,[require-container-resources-constraints]
下一篇:Airflow:psycopg2.ProgrammingError:can'tadapttype'PythonOperator'