在BigQueryGetDataOperator中获取数据的操作不返回可迭代对象,解决方法是通过修改Airflow的代码来实现返回可迭代对象的功能,具体代码见下:
代码示例:
from airflow.contrib.operators import bigquery_get_data
from airflow.contrib.hooks import bigquery_hook
from typing import List, Tuple
# override the BigQueryGetDataOperator to return an iterable object
class IterBigQueryGetDataOperator(bigquery_get_data.BigQueryGetDataOperator):
def execute(self, context):
# call the parent execute method
super().execute(context)
# create the big query hook
hook = bigquery_hook.BigQueryHook(
bigquery_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to
)
# execute the query and fetch data
conn = hook.get_conn()
cursor = conn.cursor()
cursor.execute(self.sql, self.parameters)
rows = cursor.fetchall()
# yield the rows to create an iterable object
for row in rows:
yield row
# use the IterBigQueryGetDataOperator to retrieve your data
class MyTask(Task):
def __init__(self, sql: str):
self.sql = sql
def run(self):
op = IterBigQueryGetDataOperator(
task_id='my_task',
bigquery_conn_id='bigquery_default',
sql=self.sql,
)
data: List[Tuple] = []
for row in op.execute(context={}):
# do something with the row data
data.append(row)