这个问题是由于在Airflow中大数据插入这个操作符(BigQueryInsertJobOperator)中引用变量时所涉及的带有无效字符的变量名引起的。解决办法是,使用带有无效字符的变量名称时,在变量名称周围添加反引号。下面是一个示例代码,其中的task_id在它周围使用了反引号,以指示该名称含有无效字符。
from airflow.contrib.operators.bigquery_operator import BigQueryInsertJobOperator
task = BigQueryInsertJobOperator(
task_id='insert_data_to_bq',
configuration={
'query': {
'query': 'SELECT * FROM `mydataset.mytable`',
'useLegacySql': False
}
},
project_id='my_gcp_project',
bigquery_conn_id='my_bigquery_connection',
dag=dag)
如果您的查询要使用变量作为输入,则可以通过以下方式使用它们:
from airflow.operators import BashOperator
task = BashOperator(
task_id='my_task',
depends_on_past=False,
bash_command='echo {{ params.my_variable }}',
params={'my_variable': 'Hello World'},
dag=dag
)
上一篇:BigQueryInsertJobOperator无法通过Airflow插入1亿行数据。
下一篇:BigQueryIO - 只有在第一天创建表格,即使设置了CreateDisposition.CREATE_IF_NEEDED。