在Apache Airflow 2中,通过使用 GoogleCloudStorageToBigQueryOperator 和 BigQueryToGoogleCloudStorageOperator 来从 BigQuery 抽取和加载数据到 Cloud Storage。运行时,可能会遇到以下错误:
Job Not Found: [
为了修复此错误,可以在读取或写入云存储时使用新的任务 ID。例如: from airflow.contrib.operators.gcp_bigquery_operator import BigQueryToCloudStorageOperator from google.cloud.storage import Client from google.cloud.bigquery import Client, Table
Client = Client() bucket_name = 'my-bucket' gcs_client = Client(project='my-project') gcs_bucket = gcs_client.bucket(bucket_name)
bq_client = bigquery.Client(project='my-project') table = bigquery.Table('my-project', 'my_dataset', 'my_table_name') task_id = "my_task_id"
export_data = BigQueryToCloudStorageOperator( task_id='{}export_data'.format(task_id), source_project_dataset_table=table, destination_cloud_storage_uris=['gs://{}/{}'.format( bucket_name, 'exported_data{}.csv'.format(task_id))], export_format='CSV', field_delimiter=',', print_header=False, bigquery_conn_id='bigquery_default', google_cloud_storage_conn_id='google_cloud_default' )
import_data = GoogleCloudStorageToBigQueryOperator( task_id='{}import_data'.format(task_id), bucket=bucket_name, source_objects=['exported_data{}.csv'.format(task_id)], destination_project_dataset_table='my-project.{}.{}'.format('my_dataset', 'my_table_name'), schema_fields= TABLE_SCHEMA_FIELDS, write_disposition='WRITE_TRUNCATE', google_cloud_storage_conn_id='google_cloud_default' )
export_data >> import_data
将任务 ID 添加到文件名和操作的 ID 中,可以避免以下错误:"Job Not Found: [