一种解决方法是在BigqueryOperator中设置provide_context=True,并使用xcom_push将查询结果存储到XCom中。然后,使用xcom_pull从XCom中检索结果。以下是代码示例:
from airflow import DAG
from airflow.operators import BigQueryOperator
from datetime import datetime
import time
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 6, 1),
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
with DAG('bigquery_test', default_args=default_args, schedule_interval='@hourly') as dag:
bq_query = BigQueryOperator(
task_id='bq_query',
bql='SELECT * FROM `..` WHERE ',
use_legacy_sql=False,
destination_dataset_table=':.',
write_disposition='WRITE_TRUNCATE',
provide_context=True,
dag=dag)
def execute_and_push(**kwargs):
from google.cloud import bigquery
query_job = bigquery.Client().query('SELECT COUNT(*) FROM `..`')
result = query_job.result()
count = result.fetchone()[0]
kwargs['ti'].xcom_push(key='bq_count', value=count)
bq_push = PythonOperator(
task_id='bq_push',
python_callable=execute_and_push,
provide_context=True,
dag=dag)
def pull_and_print(**kwargs):
count = kwargs['ti'].xcom_pull(key='bq_count')
print(f'Total rows in new table = {count}')
bq_pull = PythonOperator(
task_id='bq_pull',
python_callable=pull_and_print,
provide_context=True,
dag=dag)
bq_query >> bq_push >> bq_pull
在这个示例中,我们从一个表中查询数据,并将结果存储在另一个表中。然后,在execute_and_push PythonOperator中,我们将结果存储在XCom中。
上一篇:AirflowBigQueryInsertJobOperator配置如何设置?
下一篇:AirflowBigQueryOperatorDAG无法查询bigquery-public-dataset:错误:accessDenied