可以在DAG中使用BigQueryOperator来运行多个SQL文件,每个文件代表一个任务。以下是一个示例代码:
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
}
dag = DAG('my_dag', default_args=default_args, schedule_interval=None)
task_1 = BigQueryOperator(
task_id='task_1',
use_legacy_sql=False,
sql='path/to/file_1.sql',
dag=dag,
)
task_2 = BigQueryOperator(
task_id='task_2',
use_legacy_sql=False,
sql='path/to/file_2.sql',
dag=dag,
)
task_3 = BigQueryOperator(
task_id='task_3',
use_legacy_sql=False,
sql='path/to/file_3.sql',
dag=dag,
)
task_1 >> task_2 >> task_3
在这个例子中,我们定义了三个任务(task_1, task_2, task_3),每个任务对应着一个SQL文件。每个任务都使用了BigQueryOperator来运行指定的SQL文件。最后,我们使用>>
符号来定义任务之间的依赖关系,以确定它们的运行顺序。