将 BigQueryGetDataOperator 替换为 BigQueryHook,通过 query 方法获取数据,再返回结果。
代码示例:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
def get_data(**kwargs):
hook = BigQueryHook(
bigquery_conn_id='google_cloud_default',
use_legacy_sql=False,
)
conn = hook.get_conn()
cursor = conn.cursor()
cursor.execute("""
SELECT ...
""")
result = cursor.fetchall()
return result
with DAG('example_dag', schedule_interval=None, catchup=False) as dag:
task1 = PythonOperator(
task_id='get_data',
python_callable=get_data,
)
在 get_data
方法中,使用 BigQueryHook 创建连接,然后使用 cursor 执行 SQL 查询语句,并通过 fetchall
获取结果。最后将结果作为返回值返回。
然后在 DAG 中使用 PythonOperator 将 get_data
方法作为 python_callable 引入。
上一篇:AirflowBigQueryGetDataOperator不返回迭代器(8.4.0)
下一篇:AirflowBigQueryGetDataOperatordoesnotreturnaniterable(8.4.0)