Airflow是一个流程管理工具,而DBT是一个数据建模工具。它们可以很好地配合使用,以创建完整的数据管道。以下是Airflow + DBT的配置步骤:
可以使用 pip 命令安装,例如:
pip install apache-airflow==2.1.4 dbt==0.20.2
在Airflow中,我们需要设置 DAG 和任务运行计划。DAG中需要包含运行DBT的任务。以下是一个简单的DAG示例:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2021, 1, 1),
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
"dbt_dag",
default_args=default_args,
description="DAG running DBT tasks",
schedule_interval=timedelta(days=1),
)
dbt_task = BashOperator(
task_id="run_dbt",
bash_command="dbt run",
dag=dag,
)
dbt_task
我们需要设置一个 BashOperator 任务来运行 DBT 命令。在此示例中,我们只运行了 dbt run 命令,但实际的管道可能包含更多任务和脚本。
在DBT中,我们需要设置 profiles.yml 和 dbt_project.yml 文件。以下是进行这些设置的示例:
profiles.yml
my_profile:
target: dev
outputs:
dev:
type: postgres
host: localhost
port: 5432
user: my_user
password: my_password
dbname: my_db
schema: my_schema
threads: 4
keep