这个错误通常发生在尝试序列化无法通过 pickle 转换的 Python 对象时。 解决此问题的常见方法是使用 cloudpickle 而不是 pickle。 在 Airflow 中,可以在 DAG 中通过使用 ShortCircuitOperator 或 PythonOperator 来解决此问题。 这是示例代码:
from airflow.models import DAG from airflow.operators.python_operator import PythonOperator from airflow.operators.dummy_operator import DummyOperator import pyodbc import cloudpickle
def read_data(): conn = pyodbc.connect("DRIVER={SQL Server};SERVER=server_name;DATABASE=db_name;Trusted_Connection=yes") cursor = conn.cursor() query = "SELECT * FROM table_name" cursor.execute(query) results = cursor.fetchall() cursor.close() conn.close() return results
def my_func(): data = read_data() # process data here # ...
def serialize_my_func(): data = cloudpickle.dumps(my_func) return data
dag = DAG(dag_id="my_dag") task_a = PythonOperator( task_id="task_a", python_callable=serialize_my_func, dag=dag, ) task_b = DummyOperator(task_id="task_b", dag=dag) task_a >> task_b